Compare commits

...

18 Commits

Author SHA1 Message Date
RamiBerm
35dbd5fde2 TRA-3860 create main configmap for agent and tappers (#410)
* WIP

* Update options.go and serializable_regexp.go

* Update go.sum, go.sum, and 4 more files...

* Update go.sum, go.sum, and 4 more files...

* Update config.go and serializable_regexp.go

* Update config.go, config.json, and test.go

* Update tapRunner.go and provider.go

* Update provider.go

* Update tapRunner.go and provider.go

* Update config.json and test.go

* Update contract_validation.go, config.go, and 2 more files...

* Update main.go

* Update rulesHTTP.go

* Update config.go, size_enforcer.go, and 5 more files...

* Update config.go and config.go

Co-authored-by: Rami Berman <rami.berman@up9.com>
2021-10-31 15:29:05 +02:00
Igor Gov
2c29903910 Run snyk on all project with ignore current issues (#415) 2021-10-28 21:59:42 +03:00
Igor Gov
f49e29045c Adding Snyk to CI (#414) 2021-10-28 16:37:11 +03:00
Alex Haiut
67593345a7 upgraded base image to alpine:3.14 following snyk vuln report (#412) 2021-10-28 15:00:11 +03:00
RoyUP9
f069fdaae1 changed ask confirmation to be asked only once (#409) 2021-10-28 14:55:33 +03:00
gadotroee
d478a7ec43 TRA-3867 proxy host from config (#411) 2021-10-28 14:53:28 +03:00
gadotroee
af96e55f61 running proxy on 0.0.0.0 (#408) 2021-10-28 11:44:29 +03:00
RoyUP9
e3ead981ec fixed upload entries (#407) 2021-10-28 11:20:45 +03:00
RoyUP9
fbee4454e4 added k8s server version check (#404) 2021-10-27 09:35:30 +03:00
RamiBerm
e9e16551ad TRA-3850 mizu clean command + fix debug deploy (#403)
* debug mizu

* Update clean.go, cleanRunner.go, and tapRunner.go

* Update tapRunner.go

* Update cleanRunner.go

* Update tapRunner.go

Co-authored-by: Rami Berman <rami.berman@up9.com>
2021-10-27 09:28:54 +03:00
David Levanon
e635b97d11 remove duplicated tcp packet source (#405) 2021-10-26 18:16:33 +03:00
RoyUP9
779257b864 added alert and error when there is a proxy before k8b server (#402) 2021-10-26 16:57:56 +03:00
David Levanon
e696851261 Tapper Refactor (#396)
* introduce tcp_assembler and tcp_packet_source - the motivation is to … (#380)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* fix a typo

* remove unused pid param

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* extract stats functions out of the main tapping function (#381)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Feature/tapper refactor i/internal tapper stats (#384)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Feature/tapper refactor i/diagnose package (#386)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

* move errors map + app stats + internal stats + periodic tasks to diagnose package

* initialize tapper internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move tcp packet source to its packet (#387)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

* move errors map + app stats + internal stats + periodic tasks to diagnose package

* move tcp packet source to its packet

* initialize tapper internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Fix coding style

* Remove `tap/internal_stats.go`

* make channel between input and assembler blocking - to preserve the same behaviour we have before the refactor

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
Co-authored-by: M. Mert Yildiran <mehmet@up9.com>
2021-10-25 13:59:06 +03:00
RoyUP9
4e50e17d81 build m1 and windows (#395) 2021-10-24 15:10:46 +03:00
Igor Gov
991eb2ab16 Revert "TRA-3828 added build for Mac/Apple M1 and Windows (#392)" (#393)
This reverts commit f0db3b81a8.
2021-10-24 12:27:05 +03:00
Alex Haiut
f0db3b81a8 TRA-3828 added build for Mac/Apple M1 and Windows (#392) 2021-10-24 11:56:12 +03:00
Nimrod Gilboa Markevich
9df1812d8e Add k8s version requirements to README (#389)
A version lower than 1.16.0 fails with the error message: Error updating tappers: 415: Unsupported Media Type.
2021-10-21 15:29:26 +03:00
RoyUP9
4f6da91d74 fixed naming of latency to response time (#388) 2021-10-21 12:45:17 +03:00
47 changed files with 1175 additions and 508 deletions

View File

@@ -14,6 +14,10 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '1.16'
- name: Checkout
uses: actions/checkout@v2
- name: Set up Cloud SDK

View File

@@ -0,0 +1,25 @@
name: Security validation
on:
pull_request:
branches:
- 'develop'
- 'main'
jobs:
security:
name: Check for vulnerabilities
runs-on: ubuntu-latest
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
steps:
- uses: actions/checkout@v2
- uses: snyk/actions/setup@master
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '1.16'
- name: Run snyl on all projects
run: snyk test --all-projects

View File

@@ -44,7 +44,7 @@ RUN go build -ldflags="-s -w \
COPY devops/build_extensions.sh ..
RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.13.5
FROM alpine:3.14
RUN apk add bash libpcap-dev tcpdump
WORKDIR /app

View File

@@ -46,6 +46,10 @@ push-docker: ## Build and publish agent docker image.
@echo "publishing Docker image .. "
devops/build-push-featurebranch.sh
push-docker-debug:
@echo "publishing debug Docker image .. "
devops/build-push-featurebranch-debug.sh
build-docker-ci: ## Build agent docker image for CI.
@echo "building docker image for ci"
devops/build-agent-ci.sh

View File

@@ -15,6 +15,10 @@ Think TCPDump and Chrome Dev Tools combined.
- No installation or code instrumentation
- Works completely on premises
## Requirements
A Kubernetes server version of 1.16.0 or higher is required.
## Download
Download Mizu for your platform and operating system
@@ -167,6 +171,16 @@ against the contracts.
Please see [CONTRACT MONITORING](docs/CONTRACT_MONITORING.md) page for more details and syntax.
### Configure proxy host
By default, mizu will be accessible via local host: 'http://localhost:8899/mizu/', it is possible to change the host,
for instance, to '0.0.0.0' which can grant access via machine IP address.
This setting can be changed via command line flag `--set tap.proxy-host=<value>` or via config file:
tap
proxy-host: 0.0.0.0
and when changed it will support accessing by IP
## How to Run local UI
- run from mizu/agent `go run main.go --hars-read --hars-dir <folder>`

6
agent/.snyk Normal file
View File

@@ -0,0 +1,6 @@
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
version: v1.14.0
ignore:
SNYK-GOLANG-GITHUBCOMGINGONICGIN-1041736:
- '*':
reason: None Given

View File

@@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"mizuserver/pkg/api"
"mizuserver/pkg/config"
"mizuserver/pkg/controllers"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
@@ -44,6 +45,9 @@ func main() {
logLevel := determineLogLevel()
logger.InitLoggerStderrOnly(logLevel)
flag.Parse()
if err := config.LoadConfig(); err != nil {
logger.Log.Fatalf("Error loading config file %v", err)
}
loadExtensions()
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
@@ -313,3 +317,4 @@ func determineLogLevel() (logLevel logging.Level) {
}
return
}

View File

@@ -24,7 +24,7 @@ const (
)
func loadOAS(ctx context.Context) (doc *openapi3.T, contractContent string, router routers.Router, err error) {
path := fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.ContractFileName)
path := fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ContractFileName)
bytes, err := ioutil.ReadFile(path)
if err != nil {
logger.Log.Error(err.Error())

View File

@@ -0,0 +1,57 @@
package config
import (
"encoding/json"
"fmt"
"github.com/up9inc/mizu/shared"
"io/ioutil"
"os"
)
// these values are used when the config.json file is not present
const (
defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
defaultRegexTarget string = ".*"
)
var Config *shared.MizuAgentConfig
func LoadConfig() error {
if Config != nil {
return nil
}
filePath := fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ConfigFileName)
content, err := ioutil.ReadFile(filePath)
if err != nil {
if os.IsNotExist(err) {
return applyDefaultConfig()
}
return err
}
if err = json.Unmarshal(content, &Config); err != nil {
return err
}
return nil
}
func applyDefaultConfig() error {
defaultConfig, err := getDefaultConfig()
if err != nil {
return err
}
Config = defaultConfig
return nil
}
func getDefaultConfig() (*shared.MizuAgentConfig, error) {
regex, err := shared.CompileRegexToSerializableRegexp(defaultRegexTarget)
if err != nil {
return nil, err
}
return &shared.MizuAgentConfig{
TapTargetRegex: *regex,
MaxDBSizeBytes: defaultMaxDatabaseSizeBytes,
}, nil
}

View File

@@ -13,11 +13,12 @@ import (
)
const (
DBPath = "./entries.db"
OrderDesc = "desc"
OrderAsc = "asc"
LT = "lt"
GT = "gt"
DBPath = "./entries.db"
OrderDesc = "desc"
OrderAsc = "asc"
LT = "lt"
GT = "gt"
TimeFormat = "2006-01-02 15:04:05.000000000"
)
var (
@@ -57,7 +58,7 @@ func initDataBase(databasePath string) *gorm.DB {
return temp
}
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
func GetEntriesFromDb(timeFrom time.Time, timeTo time.Time, protocolName *string) []tapApi.MizuEntry {
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
@@ -67,7 +68,7 @@ func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *stri
var entries []tapApi.MizuEntry
GetEntriesTable().
Where(protocolNameCondition).
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Where(fmt.Sprintf("created_at BETWEEN '%s' AND '%s'", timeFrom.Format(TimeFormat), timeTo.Format(TimeFormat))).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)

View File

@@ -1,12 +1,11 @@
package database
import (
"mizuserver/pkg/config"
"os"
"strconv"
"time"
"github.com/fsnotify/fsnotify"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/units"
@@ -14,7 +13,6 @@ import (
)
const percentageOfMaxSizeBytesToPrune = 15
const defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
func StartEnforcingDatabaseSize() {
watcher, err := fsnotify.NewWatcher()
@@ -23,14 +21,8 @@ func StartEnforcingDatabaseSize() {
return
}
maxEntriesDBByteSize, err := getMaxEntriesDBByteSize()
if err != nil {
logger.Log.Fatalf("Error parsing max db size: %v\n", err)
return
}
checkFileSizeDebouncer := debounce.NewDebouncer(5*time.Second, func() {
checkFileSize(maxEntriesDBByteSize)
checkFileSize(config.Config.MaxDBSizeBytes)
})
go func() {
@@ -58,17 +50,6 @@ func StartEnforcingDatabaseSize() {
}
}
func getMaxEntriesDBByteSize() (int64, error) {
maxEntriesDBByteSize := defaultMaxDatabaseSizeBytes
var err error
maxEntriesDBSizeByteSEnvVarValue := os.Getenv(shared.MaxEntriesDBSizeBytesEnvVar)
if maxEntriesDBSizeByteSEnvVarValue != "" {
maxEntriesDBByteSize, err = strconv.ParseInt(maxEntriesDBSizeByteSEnvVarValue, 10, 64)
}
return maxEntriesDBByteSize, err
}
func checkFileSize(maxSizeBytes int64) {
fileStat, err := os.Stat(DBPath)
if err != nil {

View File

@@ -45,7 +45,7 @@ func ValidateService(serviceFromRule string, service string) bool {
}
func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend []RulesMatched, isEnabled bool) {
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ValidationRulesFileName))
if err == nil && len(enforcePolicy.Rules) > 0 {
isEnabled = true
}
@@ -111,7 +111,7 @@ func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
if rule.Matched == false {
return false, responseTime, numberOfRulesMatched
} else {
if strings.ToLower(rule.Rule.Type) == "responseTime" {
if strings.ToLower(rule.Rule.Type) == "slo" {
if rule.Rule.ResponseTime < responseTime || responseTime == -1 {
responseTime = rule.Rule.ResponseTime
}

View File

@@ -206,13 +206,13 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
sleepTime := time.Second * time.Duration(uploadIntervalSec)
var timestampFrom int64 = 0
var timeFrom time.Time
protocolFilter := "http"
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
logger.Log.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
protocolFilter := "http"
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
timeTo := time.Now()
logger.Log.Infof("Getting entries from %v, to %v\n", timeFrom.Format(time.RFC3339Nano), timeTo.Format(time.RFC3339Nano))
entriesArray := database.GetEntriesFromDb(timeFrom, timeTo, &protocolFilter)
if len(entriesArray) > 0 {
result := make([]har.Entry, 0)
@@ -276,13 +276,14 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
analyzeInformation.SentCount += len(entriesArray)
logger.Log.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)
} else {
logger.Log.Infof("Nothing to upload")
}
logger.Log.Infof("Sleeping for %v...\n", sleepTime)
time.Sleep(sleepTime)
timestampFrom = timestampTo
timeFrom = timeTo
}
}

View File

@@ -27,8 +27,9 @@ build-all: ## Build for all supported platforms.
@mkdir -p bin && sed s/_SEM_VER_/$(SEM_VER)/g README.md.TEMPLATE > bin/README.md
@$(MAKE) build GOOS=darwin GOARCH=amd64
@$(MAKE) build GOOS=linux GOARCH=amd64
@# $(MAKE) build GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=amd64
@$(MAKE) build GOOS=darwin GOARCH=arm64
@$(MAKE) build GOOS=windows GOARCH=amd64
@mv ./bin/mizu_windows_amd64 ./bin/mizu.exe
@# $(MAKE) GOOS=linux GOARCH=386
@# $(MAKE) GOOS=windows GOARCH=386
@# $(MAKE) GOOS=linux GOARCH=arm64

View File

@@ -2,16 +2,25 @@
Download Mizu for your platform
**Mac** (on Intel chip)
**Mac** (Intel)
```
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_darwin_amd64 && chmod 755 mizu
```
**Mac** (Apple M1 silicon)
```
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_darwin_arm64 && chmod 755 mizu
```
**Linux**
```
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_linux_amd64 && chmod 755 mizu
```
**Windows** (Intel 64bit)
```
curl -LO https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu.exe
```
### Checksums
SHA256 checksums available for compiled binaries.

View File

@@ -6,7 +6,6 @@ import (
"fmt"
"net"
"net/http"
"os"
"time"
"github.com/google/uuid"
@@ -33,19 +32,8 @@ func Login() error {
Token: token.AccessToken,
}
configFile, defaultConfigErr := config.GetConfigWithDefaults()
if defaultConfigErr != nil {
return fmt.Errorf("failed getting config with defaults, err: %v", defaultConfigErr)
}
if err := config.LoadConfigFile(config.Config.ConfigFilePath, configFile); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed getting config file, err: %v", err)
}
configFile.Auth = authConfig
if err := config.WriteConfig(configFile); err != nil {
return fmt.Errorf("failed writing config with auth, err: %v", err)
if err := config.UpdateConfig(func(configStruct *config.ConfigStruct) { configStruct.Auth = authConfig }); err != nil {
return fmt.Errorf("failed updating config with auth, err: %v", err)
}
config.Config.Auth = authConfig

20
cli/cmd/clean.go Normal file
View File

@@ -0,0 +1,20 @@
package cmd
import (
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/telemetry"
)
var cleanCmd = &cobra.Command{
Use: "clean",
Short: "Removes all mizu resources",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("clean", nil)
performCleanCommand()
return nil
},
}
func init() {
rootCmd.AddCommand(cleanCmd)
}

17
cli/cmd/cleanRunner.go Normal file
View File

@@ -0,0 +1,17 @@
package cmd
import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func performCleanCommand() {
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
if err != nil {
logger.Log.Error(err)
return
}
finishMizuExecution(kubernetesProvider)
}

View File

@@ -21,7 +21,7 @@ func GetApiServerUrl() string {
}
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))

View File

@@ -80,10 +80,19 @@ Supported protocols are HTTP and gRPC.`,
func askConfirmation(flagName string) {
logger.Log.Infof(fmt.Sprintf(uploadTrafficMessageToConfirm, flagName))
if !config.Config.Tap.AskUploadConfirmation {
return
}
if !uiUtils.AskForConfirmation("Would you like to proceed [Y/n]: ") {
logger.Log.Infof("You can always run mizu without %s, aborting", flagName)
os.Exit(0)
}
if err := config.UpdateConfig(func(configStruct *config.ConfigStruct) { configStruct.Tap.AskUploadConfirmation = false }); err != nil {
logger.Log.Debugf("failed updating config with upload confirmation, err: %v", err)
}
}
func init() {

View File

@@ -2,8 +2,11 @@ package cmd
import (
"context"
"errors"
"fmt"
"io/ioutil"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"path"
"regexp"
"strings"
@@ -51,9 +54,9 @@ func RunMizuTap() {
return
}
var mizuValidationRules string
var serializedValidationRules string
if config.Config.Tap.EnforcePolicyFile != "" {
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
serializedValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
return
@@ -61,14 +64,14 @@ func RunMizuTap() {
}
// Read and validate the OAS file
var contract string
var serializedContract string
if config.Config.Tap.ContractFile != "" {
bytes, err := ioutil.ReadFile(config.Config.Tap.ContractFile)
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading contract file: %v", errormessage.FormatError(err)))
return
}
contract = string(bytes)
serializedContract = string(bytes)
ctx := context.Background()
loader := &openapi3.Loader{Context: ctx}
@@ -84,6 +87,12 @@ func RunMizuTap() {
}
}
serializedMizuConfig, err := config.GetSerializedMizuConfig()
if err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err)))
return
}
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
if err != nil {
logger.Log.Error(err)
@@ -129,11 +138,18 @@ func RunMizuTap() {
return
}
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, mizuValidationRules, contract); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists {
logger.Log.Info("Mizu is already running in this namespace, change the `mizu-resources-namespace` configuration or run `mizu clean` to remove the currently running Mizu instance")
}
}
return
}
defer finishMizuExecution(kubernetesProvider)
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
@@ -152,7 +168,7 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuValidationRules string, contract string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
@@ -163,15 +179,15 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules, contract); err != nil {
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
}
return nil
}
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, data string, contract string) error {
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, data, contract)
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig)
return err
}

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/json"
"os"
"reflect"
"strconv"
@@ -40,7 +41,7 @@ func InitConfig(cmd *cobra.Command) error {
configFilePathFlag := cmd.Flags().Lookup(ConfigFilePathCommandName)
configFilePath := configFilePathFlag.Value.String()
if err := LoadConfigFile(configFilePath, &Config); err != nil {
if err := loadConfigFile(configFilePath, &Config); err != nil {
if configFilePathFlag.Changed || !os.IsNotExist(err) {
return fmt.Errorf("invalid config, %w\n"+
"you can regenerate the file by removing it (%v) and using `mizu config -r`", err, configFilePath)
@@ -81,7 +82,27 @@ func WriteConfig(config *ConfigStruct) error {
return nil
}
func LoadConfigFile(configFilePath string, config *ConfigStruct) error {
type updateConfigStruct func(*ConfigStruct)
func UpdateConfig(updateConfigStruct updateConfigStruct) error {
configFile, err := GetConfigWithDefaults()
if err != nil {
return fmt.Errorf("failed getting config with defaults, err: %v", err)
}
if err := loadConfigFile(Config.ConfigFilePath, configFile); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed getting config file, err: %v", err)
}
updateConfigStruct(configFile)
if err := WriteConfig(configFile); err != nil {
return fmt.Errorf("failed writing config, err: %v", err)
}
return nil
}
func loadConfigFile(configFilePath string, config *ConfigStruct) error {
reader, openErr := os.Open(configFilePath)
if openErr != nil {
return openErr
@@ -343,3 +364,27 @@ func setZeroForReadonlyFields(currentElem reflect.Value) {
}
}
}
func GetSerializedMizuConfig() (string, error) {
mizuConfig, err := getMizuConfig()
if err != nil {
return "", err
}
serializedConfig, err := json.Marshal(mizuConfig)
if err != nil {
return "", err
}
return string(serializedConfig), nil
}
func getMizuConfig() (*shared.MizuAgentConfig, error) {
serializableRegex, err := shared.CompileRegexToSerializableRegexp(Config.Tap.PodRegexStr)
if err != nil {
return nil, err
}
config := shared.MizuAgentConfig{
TapTargetRegex: *serializableRegex,
MaxDBSizeBytes: Config.Tap.MaxEntriesDBSizeBytes(),
}
return &config, nil
}

View File

@@ -14,6 +14,7 @@ import (
const (
MizuResourcesNamespaceConfigName = "mizu-resources-namespace"
ConfigFilePathCommandName = "config-path"
KubeConfigPathConfigName = "kube-config-path"
)
type ConfigStruct struct {

View File

@@ -26,6 +26,7 @@ type TapConfig struct {
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
PodRegexStr string `yaml:"regex" default:".*"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
Namespaces []string `yaml:"namespaces"`
Analysis bool `yaml:"analysis" default:"false"`
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
@@ -37,6 +38,7 @@ type TapConfig struct {
Workspace string `yaml:"workspace"`
EnforcePolicyFile string `yaml:"traffic-validation-file"`
ContractFile string `yaml:"contract"`
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
ApiServerResources Resources `yaml:"api-server-resources"`
TapperResources Resources `yaml:"tapper-resources"`
}

View File

@@ -7,12 +7,13 @@ import (
"encoding/json"
"errors"
"fmt"
"path/filepath"
"regexp"
"strconv"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
"k8s.io/apimachinery/pkg/version"
"net/url"
"path/filepath"
"regexp"
"io"
@@ -77,6 +78,14 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
}
if err := validateNotProxy(kubernetesConfig, restClientConfig); err != nil {
return nil, err
}
if err := validateKubernetesVersion(clientSet); err != nil {
return nil, err
}
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
@@ -167,10 +176,8 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
}
}
configMapVolumeName := &core.ConfigMapVolumeSource{}
configMapVolumeName.Name = mizu.ConfigMapName
configMapOptional := true
configMapVolumeName.Optional = &configMapOptional
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = mizu.ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
@@ -216,7 +223,7 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
VolumeMounts: []core.VolumeMount{
{
Name: mizu.ConfigMapName,
MountPath: shared.RulePolicyPath,
MountPath: shared.ConfigDirPath,
},
},
Command: command,
@@ -225,10 +232,6 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
Name: shared.SyncEntriesConfigEnvVar,
Value: string(marshaledSyncEntriesConfig),
},
{
Name: shared.MaxEntriesDBSizeBytesEnvVar,
Value: strconv.FormatInt(opts.MaxEntriesDBSizeBytes, 10),
},
{
Name: shared.DebugModeEnvVar,
Value: debugMode,
@@ -269,7 +272,7 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
{
Name: mizu.ConfigMapName,
VolumeSource: core.VolumeSource{
ConfigMap: configMapVolumeName,
ConfigMap: configMapVolume,
},
},
},
@@ -485,14 +488,16 @@ func (provider *Provider) handleRemovalError(err error) error {
return err
}
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, data string, contract string) error {
if data == "" && contract == "" {
return nil
}
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
configMapData := make(map[string]string, 0)
configMapData[shared.RulePolicyFileName] = data
configMapData[shared.ContractFileName] = contract
if serializedValidationRules != "" {
configMapData[shared.ValidationRulesFileName] = serializedValidationRules
}
if serializedContract != "" {
configMapData[shared.ContractFileName] = serializedContract
}
configMapData[shared.ConfigFileName] = serializedMizuConfig
configMap := &core.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
@@ -611,6 +616,24 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
noScheduleToleration.WithOperator(core.TolerationOpExists)
noScheduleToleration.WithEffect(core.TaintEffectNoSchedule)
volumeName := mizu.ConfigMapName
configMapVolume := applyconfcore.VolumeApplyConfiguration{
Name: &volumeName,
VolumeSourceApplyConfiguration: applyconfcore.VolumeSourceApplyConfiguration{
ConfigMap: &applyconfcore.ConfigMapVolumeSourceApplyConfiguration{
LocalObjectReferenceApplyConfiguration: applyconfcore.LocalObjectReferenceApplyConfiguration{
Name: &volumeName,
},
},
},
}
mountPath := shared.ConfigDirPath
configMapVolumeMount := applyconfcore.VolumeMountApplyConfiguration{
Name: &volumeName,
MountPath: &mountPath,
}
agentContainer.WithVolumeMounts(&configMapVolumeMount)
podSpec := applyconfcore.PodSpec()
podSpec.WithHostNetwork(true)
podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet)
@@ -621,6 +644,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
podSpec.WithContainers(agentContainer)
podSpec.WithAffinity(affinity)
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podSpec.WithVolumes(&configMapVolume)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{"app": tapperPodName})
@@ -727,3 +751,47 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
func isPodRunning(pod *core.Pod) bool {
return pod.Status.Phase == core.PodRunning
}
// We added this after a customer tried to run mizu from lens, which used len's kube config, which have cluster server configuration, which points to len's local proxy.
// The workaround was to use the user's local default kube config.
// For now - we are blocking the option to run mizu through a proxy to k8s server
func validateNotProxy(kubernetesConfig clientcmd.ClientConfig, restClientConfig *restclient.Config) error {
kubernetesUrl, err := url.Parse(restClientConfig.Host)
if err != nil {
logger.Log.Debugf("validateNotProxy - error while parsing kubernetes host, err: %v", err)
return nil
}
restProxyClientConfig, _ := kubernetesConfig.ClientConfig()
restProxyClientConfig.Host = kubernetesUrl.Host
clientProxySet, err := getClientSet(restProxyClientConfig)
if err == nil {
proxyServerVersion, err := clientProxySet.ServerVersion()
if err != nil {
return nil
}
if *proxyServerVersion == (version.Info{}) {
return fmt.Errorf("cannot establish http-proxy connection to the Kubernetes cluster. If youre using Lens or similar tool, please run mizu with regular kubectl config using --%v %v=$HOME/.kube/config flag", config.SetCommandName, config.KubeConfigPathConfigName)
}
}
return nil
}
func validateKubernetesVersion(clientSet *kubernetes.Clientset) error {
serverVersion, err := clientSet.ServerVersion()
if err != nil {
logger.Log.Debugf("error while getting kubernetes server version, err: %v", err)
return nil
}
serverVersionSemVer := semver.SemVersion(serverVersion.GitVersion)
minKubernetesServerVersionSemVer := semver.SemVersion(mizu.MinKubernetesServerVersion)
if minKubernetesServerVersionSemVer.GreaterThan(serverVersionSemVer) {
return fmt.Errorf("kubernetes server version %v is not supported, supporting only kubernetes server version of %v or higher", serverVersion.GitVersion, mizu.MinKubernetesServerVersion)
}
return nil
}

View File

@@ -14,12 +14,12 @@ import (
const k8sProxyApiPrefix = "/"
const mizuServicePort = 80
func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
func StartProxy(kubernetesProvider *Provider, proxyHost string, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
logger.Log.Debugf("Starting proxy. namespace: [%v], service name: [%s], port: [%v]", mizuNamespace, mizuServiceName, mizuPort)
filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),
AcceptHosts: proxy.MakeRegexpArrayOrDie(proxy.DefaultHostAcceptRE),
AcceptHosts: proxy.MakeRegexpArrayOrDie("^.*"),
RejectMethods: proxy.MakeRegexpArrayOrDie(proxy.DefaultMethodRejectRE),
}
@@ -32,7 +32,7 @@ func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace str
mux.Handle("/static/", getRerouteHttpHandlerMizuStatic(proxyHandler, mizuNamespace, mizuServiceName))
mux.Handle("/mizu/", getRerouteHttpHandlerMizuAPI(proxyHandler, mizuNamespace, mizuServiceName))
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", int(mizuPort)))
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", proxyHost, int(mizuPort)))
if err != nil {
return err
}

View File

@@ -14,17 +14,18 @@ var (
)
const (
MizuResourcesPrefix = "mizu-"
ApiServerPodName = MizuResourcesPrefix + "api-server"
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
K8sAllNamespaces = ""
RoleBindingName = MizuResourcesPrefix + "role-binding"
RoleName = MizuResourcesPrefix + "role"
ServiceAccountName = MizuResourcesPrefix + "service-account"
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
TapperPodName = MizuResourcesPrefix + "tapper"
ConfigMapName = MizuResourcesPrefix + "policy"
MizuResourcesPrefix = "mizu-"
ApiServerPodName = MizuResourcesPrefix + "api-server"
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
K8sAllNamespaces = ""
RoleBindingName = MizuResourcesPrefix + "role-binding"
RoleName = MizuResourcesPrefix + "role"
ServiceAccountName = MizuResourcesPrefix + "service-account"
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
TapperPodName = MizuResourcesPrefix + "tapper"
ConfigMapName = MizuResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)
func GetMizuFolderPath() string {

View File

@@ -85,7 +85,13 @@ func CheckNewerVersion(versionChan chan string) {
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS)
var downloadMessage string
if runtime.GOOS == "windows" {
downloadMessage = fmt.Sprintf("curl -LO %v/mizu.exe", strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1))
} else {
downloadMessage = fmt.Sprintf("curl -Lo mizu %v/mizu_%s_%s && chmod 755 mizu", strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS, runtime.GOARCH)
}
versionChan <- fmt.Sprintf("Update available! %v -> %v (%s)", mizu.SemVer, gitHubVersion, downloadMessage)
} else {
versionChan <- ""
}

View File

@@ -12,7 +12,7 @@ FROM golang:1.16-alpine AS builder
# Set necessary environment variables needed for our image.
ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64
RUN apk add libpcap-dev gcc g++ make
RUN apk add libpcap-dev gcc g++ make bash
# Move to agent working directory (/agent-build).
WORKDIR /app/agent-build
@@ -20,17 +20,25 @@ WORKDIR /app/agent-build
COPY agent/go.mod agent/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
COPY tap/go.mod tap/go.mod ../tap/
COPY tap/api/go.* ../tap/api/
RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
ARG COMMIT_HASH
ARG GIT_BRANCH
ARG BUILD_TIMESTAMP
ARG SEM_VER
# Copy and build agent code
COPY shared ../shared
COPY tap ../tap
COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
COPY devops/build_extensions_debug.sh ..
RUN cd .. && /bin/bash build_extensions_debug.sh
FROM golang:1.16-alpine
@@ -39,6 +47,7 @@ WORKDIR /app
# Copy binary and config files from /build to root folder of scratch container.
COPY --from=builder ["/app/agent-build/mizuagent", "."]
COPY --from=builder ["/app/agent/build/extensions", "extensions"]
COPY --from=site-build ["/app/ui-build/build", "site"]
# install remote debugging tool

View File

@@ -0,0 +1,28 @@
#!/bin/bash
set -e
GCP_PROJECT=up9-docker-hub
REPOSITORY=gcr.io/$GCP_PROJECT
SERVER_NAME=mizu
GIT_BRANCH=$(git branch | grep \* | cut -d ' ' -f2 | tr '[:upper:]' '[:lower:]')
DOCKER_REPO=$REPOSITORY/$SERVER_NAME/$GIT_BRANCH
SEM_VER=${SEM_VER=0.0.0}
DOCKER_TAGGED_BUILDS=("$DOCKER_REPO:latest" "$DOCKER_REPO:$SEM_VER")
if [ "$GIT_BRANCH" = 'develop' -o "$GIT_BRANCH" = 'master' -o "$GIT_BRANCH" = 'main' ]
then
echo "Pushing to $GIT_BRANCH is allowed only via CI"
exit 1
fi
echo "building ${DOCKER_TAGGED_BUILDS[@]}"
DOCKER_TAGS_ARGS=$(echo ${DOCKER_TAGGED_BUILDS[@]/#/-t }) # "-t FIRST_TAG -t SECOND_TAG ..."
docker build -f debug.Dockerfile $DOCKER_TAGS_ARGS --build-arg SEM_VER=${SEM_VER} --build-arg BUILD_TIMESTAMP=${BUILD_TIMESTAMP} --build-arg GIT_BRANCH=${GIT_BRANCH} --build-arg COMMIT_HASH=${COMMIT_HASH} .
for DOCKER_TAG in "${DOCKER_TAGGED_BUILDS[@]}"
do
echo pushing "$DOCKER_TAG"
docker push "$DOCKER_TAG"
done

View File

@@ -0,0 +1,12 @@
#!/bin/bash
for f in tap/extensions/*; do
if [ -d "$f" ]; then
extension=$(basename $f) && \
cd tap/extensions/${extension} && \
go build -gcflags="all=-N -l" -buildmode=plugin -o ../${extension}.so . && \
cd ../../.. && \
mkdir -p agent/build/extensions && \
cp tap/extensions/${extension}.so agent/build/extensions
fi
done

View File

@@ -6,10 +6,10 @@ const (
HostModeEnvVar = "HOST_MODE"
NodeNameEnvVar = "NODE_NAME"
TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
ConfigDirPath = "/app/config/"
ValidationRulesFileName = "validation-rules.yaml"
ContractFileName = "contract-oas.yaml"
ConfigFileName = "mizu-config.json"
GoGCEnvVar = "GOGC"
DefaultApiServerPort = 8899
DebugModeEnvVar = "MIZU_DEBUG"

View File

@@ -18,6 +18,11 @@ const (
WebsocketMessageTypeOutboundLink WebSocketMessageType = "outboundLink"
)
type MizuAgentConfig struct {
TapTargetRegex SerializableRegexp `yaml:"tapTargetRegex"`
MaxDBSizeBytes int64 `yaml:"maxDBSizeBytes"`
}
type WebSocketMessageMetadata struct {
MessageType WebSocketMessageType `json:"messageType,omitempty"`
}

View File

@@ -0,0 +1,30 @@
package shared
import "regexp"
type SerializableRegexp struct {
regexp.Regexp
}
func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) {
re, err := regexp.Compile(expr)
if err != nil {
return nil, err
}
return &SerializableRegexp{*re}, nil
}
// UnmarshalText is by json.Unmarshal.
func (r *SerializableRegexp) UnmarshalText(text []byte) error {
rr, err := CompileRegexToSerializableRegexp(string(text))
if err != nil {
return err
}
*r = *rr
return nil
}
// MarshalText is used by json.Marshal.
func (r *SerializableRegexp) MarshalText() ([]byte, error) {
return []byte(r.String()), nil
}

76
tap/diagnose/diagnose.go Normal file
View File

@@ -0,0 +1,76 @@
package diagnose
import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"strconv"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
var AppStats = api.AppStats{}
func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
dumpPath := "/app/pprof"
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
logger.Log.Fatal("could not create directory for profile: ", err)
}
}
for {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
logger.Log.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
logger.Log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
logger.Log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func DumpMemoryProfile(filename string) error {
if filename == "" {
return nil
}
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
if err := pprof.WriteHeapProfile(f); err != nil {
return err
}
return nil
}

View File

@@ -1,23 +1,41 @@
package tap
package diagnose
import (
"fmt"
"sync"
"github.com/google/gopacket/examples/util"
"github.com/up9inc/mizu/shared/logger"
)
var TapErrors *errorsMap
type errorsMap struct {
errorsMap map[string]uint
outputLevel int
nErrors uint
OutputLevel int
ErrorsCount uint
errorsMapMutex sync.Mutex
}
func NewErrorsMap(outputLevel int) *errorsMap {
func InitializeErrorsMap(debug bool, verbose bool, quiet bool) {
var outputLevel int
defer util.Run()()
if debug {
outputLevel = 2
} else if verbose {
outputLevel = 1
} else if quiet {
outputLevel = -1
}
TapErrors = newErrorsMap(outputLevel)
}
func newErrorsMap(outputLevel int) *errorsMap {
return &errorsMap{
errorsMap: make(map[string]uint),
outputLevel: outputLevel,
OutputLevel: outputLevel,
}
}
@@ -28,12 +46,12 @@ func NewErrorsMap(outputLevel int) *errorsMap {
*/
func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) {
e.errorsMapMutex.Lock()
e.nErrors++
e.ErrorsCount++
nb := e.errorsMap[t]
e.errorsMap[t] = nb + 1
e.errorsMapMutex.Unlock()
if e.outputLevel >= minOutputLevel {
if e.OutputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
logger.Log.Errorf(formatStr, a...)
}
@@ -51,10 +69,17 @@ func (e *errorsMap) Debug(s string, a ...interface{}) {
logger.Log.Debugf(s, a...)
}
func (e *errorsMap) getErrorsSummary() (int, string) {
func (e *errorsMap) GetErrorsSummary() (int, string) {
e.errorsMapMutex.Lock()
errorMapLen := len(e.errorsMap)
errorsSummery := fmt.Sprintf("%v", e.errorsMap)
e.errorsMapMutex.Unlock()
return errorMapLen, errorsSummery
}
func (e *errorsMap) PrintSummary() {
logger.Log.Infof("Errors: %d", e.ErrorsCount)
for t := range e.errorsMap {
logger.Log.Infof(" %s:\t\t%d", e, e.errorsMap[t])
}
}

View File

@@ -0,0 +1,46 @@
package diagnose
import "github.com/up9inc/mizu/shared/logger"
type tapperInternalStats struct {
Ipdefrag int
MissedBytes int
Pkt int
Sz int
Totalsz int
RejectFsm int
RejectOpt int
RejectConnFsm int
Reassembled int
OutOfOrderBytes int
OutOfOrderPackets int
BiggestChunkBytes int
BiggestChunkPackets int
OverlapBytes int
OverlapPackets int
}
var InternalStats *tapperInternalStats
func InitializeTapperInternalStats() {
InternalStats = &tapperInternalStats{}
}
func (stats *tapperInternalStats) PrintStatsSummary() {
logger.Log.Infof("IPdefrag:\t\t%d", stats.Ipdefrag)
logger.Log.Infof("TCP stats:")
logger.Log.Infof(" missed bytes:\t\t%d", stats.MissedBytes)
logger.Log.Infof(" total packets:\t\t%d", stats.Pkt)
logger.Log.Infof(" rejected FSM:\t\t%d", stats.RejectFsm)
logger.Log.Infof(" rejected Options:\t%d", stats.RejectOpt)
logger.Log.Infof(" reassembled bytes:\t%d", stats.Sz)
logger.Log.Infof(" total TCP bytes:\t%d", stats.Totalsz)
logger.Log.Infof(" conn rejected FSM:\t%d", stats.RejectConnFsm)
logger.Log.Infof(" reassembled chunks:\t%d", stats.Reassembled)
logger.Log.Infof(" out-of-order packets:\t%d", stats.OutOfOrderPackets)
logger.Log.Infof(" out-of-order bytes:\t%d", stats.OutOfOrderBytes)
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.BiggestChunkPackets)
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.BiggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.OverlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.OverlapBytes)
}

View File

@@ -3,6 +3,8 @@ package tap
import (
"net"
"strings"
"github.com/up9inc/mizu/tap/diagnose"
)
var privateIPBlocks []*net.IPNet
@@ -55,7 +57,7 @@ func initPrivateIPBlocks() {
} {
_, block, err := net.ParseCIDR(cidr)
if err != nil {
tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
diagnose.TapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
} else {
privateIPBlocks = append(privateIPBlocks, block)
}

View File

@@ -7,11 +7,11 @@ const (
)
type OutboundLink struct {
Src string
DstIP string
DstPort int
Src string
DstIP string
DstPort int
SuggestedResolvedName string
SuggestedProtocol OutboundLinkProtocol
SuggestedProtocol OutboundLinkProtocol
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
@@ -26,11 +26,11 @@ type OutboundLinkWriter struct {
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
Src: src,
DstIP: DstIP,
DstPort: DstPort,
SuggestedResolvedName: SuggestedResolvedName,
SuggestedProtocol: SuggestedProtocol,
SuggestedProtocol: SuggestedProtocol,
}
}

View File

@@ -9,28 +9,17 @@
package tap
import (
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/examples/util"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
const cleanPeriod = time.Second * 10
@@ -62,28 +51,6 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var appStats = api.AppStats{}
var tapErrors *errorsMap
// global
var stats struct {
ipdefrag int
missedBytes int
pkt int
sz int
totalsz int
rejectFsm int
rejectOpt int
rejectConnFsm int
reassembled int
outOfOrderBytes int
outOfOrderPackets int
biggestChunkBytes int
biggestChunkPackets int
overlapBytes int
overlapPackets int
}
type TapOpts struct {
HostMode bool
}
@@ -110,353 +77,121 @@ func inArrayString(arr []string, valueToCheck string) bool {
return false
}
// Context
// The assembler context
type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() api.AppStats {
return appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
hostMode = opts.HostMode
extensions = extensionsRef
filteringOptions = options
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
}
go startPassiveTapper(outputItems)
}
func startMemoryProfiler() {
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
func printPeriodicStats(cleaner *Cleaner) {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
for {
<-ticker.C
// Since the start
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen,
errorsSummery,
)
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
// Since the last print
cleanStats := cleaner.dumpStats()
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := diagnose.AppStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
logger.Log.Fatal("could not create directory for profile: ", err)
}
}
for {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
logger.Log.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
logger.Log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
logger.Log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
var outputLevel int
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
diagnose.InitializeTapperInternalStats()
defer util.Run()()
if *debug {
outputLevel = 2
} else if *verbose {
outputLevel = 1
} else if *quiet {
outputLevel = -1
}
tapErrors = NewErrorsMap(outputLevel)
var handle *pcap.Handle
var err error
if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil {
logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(*iface)
if err != nil {
logger.Log.Fatalf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(*snaplen); err != nil {
logger.Log.Fatalf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(*promisc); err != nil {
logger.Log.Fatalf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
logger.Log.Fatalf("could not set timeout: %v", err)
}
if *tstype != "" {
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if handle, err = inactive.Activate(); err != nil {
logger.Log.Fatalf("PCAP Activate error: %v", err)
}
defer handle.Close()
}
var bpffilter string
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
logger.Log.Infof("Using BPF filter %q", bpffilter)
if err = handle.SetBPFFilter(bpffilter); err != nil {
logger.Log.Fatalf("BPF filter error: %v", err)
}
bpffilter = strings.Join(flag.Args(), " ")
}
var dec gopacket.Decoder
var ok bool
decoderName := *decoder
if decoderName == "" {
decoderName = handle.LinkType().String()
packetSource, err := source.NewTcpPacketSource(*fname, *iface, source.TcpPacketSourceBehaviour{
SnapLength: *snaplen,
Promisc: *promisc,
Tstype: *tstype,
DecoderName: *decoder,
Lazy: *lazy,
BpfFilter: bpffilter,
})
if err != nil {
logger.Log.Fatal(err)
}
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
logger.Log.Fatal("No decoder named", decoderName)
defer packetSource.Close()
if err != nil {
logger.Log.Fatal(err)
}
source := gopacket.NewPacketSource(handle, dec)
source.Lazy = *lazy
source.NoCopy = true
packets := make(chan source.TcpPacketInfo)
assembler := NewTcpAssembler(outputItems, streamsMap)
logger.Log.Info("Starting to read packets")
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
diagnose.AppStats.SetStartTime(time.Now())
var emitter api.Emitter = &api.Emitting{
AppStats: &appStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
var assemblerMutex sync.Mutex
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
go packetSource.ReadPackets(!*nodefrag, packets)
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
cleaner := Cleaner{
assembler: assembler,
assemblerMutex: &assemblerMutex,
assembler: assembler.Assembler,
assemblerMutex: &assembler.assemblerMutex,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
}
cleaner.start()
go func() {
statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod)
go printPeriodicStats(&cleaner)
for {
<-ticker.C
assembler.processPackets(*hexdumppkt, packets)
// Since the start
errorMapLen, errorsSummery := tapErrors.getErrorsSummary()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(appStats.StartTime),
tapErrors.nErrors,
errorMapLen,
errorsSummery,
)
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
)
// Since the last print
cleanStats := cleaner.dumpStats()
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
}()
if GetMemoryProfilingEnabled() {
startMemoryProfiler()
if diagnose.TapErrors.OutputLevel >= 2 {
assembler.dumpStreamPool()
}
for {
packet, err := source.NextPacket()
if err == io.EOF {
break
} else if err != nil {
if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue
}
packetsCount := appStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
// defrag the IPv4 packet if required
if !*nodefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
appStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
}
}
c := Context{
CaptureInfo: packet.Metadata().CaptureInfo,
}
stats.totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
assemblerMutex.Lock()
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorMapLen, _ := tapErrors.getErrorsSummary()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
tapErrors.nErrors,
errorMapLen)
}
select {
case <-signalChan:
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
}
if done {
break
}
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
logger.Log.Errorf("Error dumping memory profile %v\n", err)
}
assemblerMutex.Lock()
closed := assembler.FlushAll()
assemblerMutex.Unlock()
logger.Log.Debugf("Final flush: %d closed", closed)
if outputLevel >= 2 {
streamPool.Dump()
}
assembler.waitAndDump()
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
logger.Log.Fatal(err)
}
_ = pprof.WriteHeapProfile(f)
_ = f.Close()
}
streamFactory.WaitGoRoutines()
assemblerMutex.Lock()
logger.Log.Debugf("%s", assembler.Dump())
assemblerMutex.Unlock()
if !*nodefrag {
logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag)
}
logger.Log.Infof("TCP stats:")
logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes)
logger.Log.Infof(" total packets:\t\t%d", stats.pkt)
logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm)
logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt)
logger.Log.Infof(" reassembled bytes:\t%d", stats.sz)
logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz)
logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm)
logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled)
logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets)
logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
logger.Log.Infof("Errors: %d", tapErrors.nErrors)
for e := range tapErrors.errorsMap {
logger.Log.Infof(" %s:\t\t%d", e, tapErrors.errorsMap[e])
}
logger.Log.Infof("AppStats: %v", GetStats())
diagnose.InternalStats.PrintStatsSummary()
diagnose.TapErrors.PrintSummary()
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
}

View File

@@ -0,0 +1,150 @@
package source
import (
"fmt"
"io"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type TcpPacketSource struct {
source *gopacket.PacketSource
handle *pcap.Handle
defragger *ip4defrag.IPv4Defragmenter
Behaviour *TcpPacketSourceBehaviour
}
type TcpPacketSourceBehaviour struct {
SnapLength int
Promisc bool
Tstype string
DecoderName string
Lazy bool
BpfFilter string
}
type TcpPacketInfo struct {
Packet gopacket.Packet
Source *TcpPacketSource
}
func NewTcpPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*TcpPacketSource, error) {
var err error
result := &TcpPacketSource{
defragger: ip4defrag.NewIPv4Defragmenter(),
Behaviour: &behaviour,
}
if filename != "" {
if result.handle, err = pcap.OpenOffline(filename); err != nil {
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
// for creating the packet capture handle... instead of all this you can
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(interfaceName)
if err != nil {
return result, fmt.Errorf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil {
return result, fmt.Errorf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(behaviour.Promisc); err != nil {
return result, fmt.Errorf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
return result, fmt.Errorf("could not set timeout: %v", err)
}
if behaviour.Tstype != "" {
if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if result.handle, err = inactive.Activate(); err != nil {
return result, fmt.Errorf("PCAP Activate error: %v", err)
}
}
if behaviour.BpfFilter != "" {
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil {
return nil, fmt.Errorf("BPF filter error: %v", err)
}
}
var dec gopacket.Decoder
var ok bool
if behaviour.DecoderName == "" {
behaviour.DecoderName = result.handle.LinkType().String()
}
if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
}
result.source = gopacket.NewPacketSource(result.handle, dec)
result.source.Lazy = behaviour.Lazy
result.source.NoCopy = true
return result, nil
}
func (source *TcpPacketSource) Close() {
if source.handle != nil {
source.handle.Close()
}
}
func (source *TcpPacketSource) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) error {
for {
packet, err := source.source.NextPacket()
if err == io.EOF {
return err
} else if err != nil {
if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue
}
// defrag the IPv4 packet if required
if !ipdefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
if ip4Layer == nil {
continue
}
ip4 := ip4Layer.(*layers.IPv4)
l := ip4.Length
newip4, err := source.defragger.DefragIPv4(ip4)
if err != nil {
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
diagnose.InternalStats.Ipdefrag++
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
packets <- TcpPacketInfo{
Packet: packet,
Source: source,
}
}
}

132
tap/tcp_assembler.go Normal file
View File

@@ -0,0 +1,132 @@
package tap
import (
"encoding/hex"
"os"
"os/signal"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
"github.com/up9inc/mizu/tap/source"
)
type tcpAssembler struct {
*reassembly.Assembler
streamPool *reassembly.StreamPool
streamFactory *tcpStreamFactory
assemblerMutex sync.Mutex
}
// Context
// The assembler context
type context struct {
CaptureInfo gopacket.CaptureInfo
}
func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
var emitter api.Emitter = &api.Emitting{
AppStats: &diagnose.AppStats,
OutputChannel: outputItems,
}
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
return &tcpAssembler{
Assembler: assembler,
streamPool: streamPool,
streamFactory: streamFactory,
}
}
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
for packetInfo := range packets {
packetsCount := diagnose.AppStats.IncPacketsCount()
logger.Log.Debugf("PACKET #%d", packetsCount)
packet := packetInfo.Packet
data := packet.Data()
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
if dumpPacket {
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
diagnose.AppStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
}
}
c := context{
CaptureInfo: packet.Metadata().CaptureInfo,
}
diagnose.InternalStats.Totalsz += len(tcp.Payload)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
a.assemblerMutex.Lock()
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
a.assemblerMutex.Unlock()
}
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
if done {
errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
diagnose.AppStats.PacketsCount,
diagnose.AppStats.ProcessedBytes,
time.Since(diagnose.AppStats.StartTime),
diagnose.TapErrors.ErrorsCount,
errorMapLen)
}
select {
case <-signalChan:
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
}
if done {
break
}
}
a.assemblerMutex.Lock()
closed := a.FlushAll()
a.assemblerMutex.Unlock()
logger.Log.Debugf("Final flush: %d closed", closed)
}
func (a *tcpAssembler) dumpStreamPool() {
a.streamPool.Dump()
}
func (a *tcpAssembler) waitAndDump() {
a.streamFactory.WaitGoRoutines()
a.assemblerMutex.Lock()
logger.Log.Debugf("%s", a.Dump())
a.assemblerMutex.Unlock()
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
)
/* It's a connection (bidirectional)
@@ -36,11 +37,11 @@ type tcpStream struct {
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
stats.rejectFsm++
diagnose.TapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
diagnose.InternalStats.RejectFsm++
if !t.fsmerr {
t.fsmerr = true
stats.rejectConnFsm++
diagnose.InternalStats.RejectConnFsm++
}
if !*ignorefsmerr {
return false
@@ -49,8 +50,8 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
stats.rejectOpt++
diagnose.TapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
diagnose.InternalStats.RejectOpt++
if !*nooptcheck {
return false
}
@@ -60,15 +61,15 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
diagnose.TapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false
} else if c != 0x0 {
tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
diagnose.TapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false
}
}
if !accept {
stats.rejectOpt++
diagnose.InternalStats.RejectOpt++
}
return accept
}
@@ -79,28 +80,28 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
// update stats
sgStats := sg.Stats()
if skip > 0 {
stats.missedBytes += skip
diagnose.InternalStats.MissedBytes += skip
}
stats.sz += length - saved
stats.pkt += sgStats.Packets
diagnose.InternalStats.Sz += length - saved
diagnose.InternalStats.Pkt += sgStats.Packets
if sgStats.Chunks > 1 {
stats.reassembled++
diagnose.InternalStats.Reassembled++
}
stats.outOfOrderPackets += sgStats.QueuedPackets
stats.outOfOrderBytes += sgStats.QueuedBytes
if length > stats.biggestChunkBytes {
stats.biggestChunkBytes = length
diagnose.InternalStats.OutOfOrderPackets += sgStats.QueuedPackets
diagnose.InternalStats.OutOfOrderBytes += sgStats.QueuedBytes
if length > diagnose.InternalStats.BiggestChunkBytes {
diagnose.InternalStats.BiggestChunkBytes = length
}
if sgStats.Packets > stats.biggestChunkPackets {
stats.biggestChunkPackets = sgStats.Packets
if sgStats.Packets > diagnose.InternalStats.BiggestChunkPackets {
diagnose.InternalStats.BiggestChunkPackets = sgStats.Packets
}
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
diagnose.TapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets
diagnose.InternalStats.OverlapBytes += sgStats.OverlapBytes
diagnose.InternalStats.OverlapPackets += sgStats.OverlapPackets
var ident string
if dir == reassembly.TCPDirClientToServer {
@@ -108,7 +109,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
tapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
diagnose.TapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
@@ -127,18 +128,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
tapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
diagnose.TapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 {
tapErrors.Debug("Missing some bytes: %d", missing)
diagnose.TapErrors.Debug("Missing some bytes: %d", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
diagnose.TapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else {
tapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
diagnose.TapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
@@ -147,7 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
appStats.IncReassembledTcpPayloadsCount()
diagnose.AppStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for i := range t.clients {
@@ -173,7 +174,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
tapErrors.Debug("%s: Connection closed", t.ident)
diagnose.TapErrors.Debug("%s: Connection closed", t.ident)
if t.isTapTarget && !t.isClosed {
t.Close()
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type tcpStreamMap struct {
@@ -44,9 +45,9 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
appStats.IncDroppedTcpStreams()
diagnose.AppStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
appStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {

View File

@@ -2,7 +2,6 @@ package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
@@ -34,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
continue
}
fmt.Printf("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
@@ -69,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) {
})
for _, extension := range extensions {
fmt.Printf("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v\n", extension)
}
return extensions, nil
@@ -93,7 +92,7 @@ func internalRun() error {
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
fmt.Printf("Tapping, press enter to exit...\n")
logger.Log.Infof("Tapping, press enter to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
@@ -105,9 +104,9 @@ func main() {
if err != nil {
switch err := err.(type) {
case *errors.Error:
fmt.Printf("Error: %v\n", err.ErrorStack())
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
default:
fmt.Printf("Error: %v\n", err)
logger.Log.Errorf("Error: %v\n", err)
}
os.Exit(1)

135
ui/.snyk Normal file
View File

@@ -0,0 +1,135 @@
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
version: v1.14.0
ignore:
SNYK-JS-AXIOS-1579269:
- '*':
reason: None Given
SNYK-JS-TRIMNEWLINES-1298042:
- '*':
reason: None Given
SNYK-JS-ANSIHTML-1296849:
- '*':
reason: None Given
SNYK-JS-ANSIREGEX-1583908:
- '*':
reason: None Given
SNYK-JS-BROWSERSLIST-1090194:
- '*':
reason: None Given
SNYK-JS-CSSWHAT-1298035:
- '*':
reason: None Given
SNYK-JS-DNSPACKET-1293563:
- '*':
reason: None Given
SNYK-JS-EJS-1049328:
- '*':
reason: None Given
SNYK-JS-GLOBPARENT-1016905:
- '*':
reason: None Given
SNYK-JS-IMMER-1540542:
- '*':
reason: None Given
SNYK-JS-LODASHTEMPLATE-1088054:
- '*':
reason: None Given
SNYK-JS-NODESASS-1059081:
- '*':
reason: None Given
SNYK-JS-NODESASS-535498:
- '*':
reason: None Given
SNYK-JS-NODESASS-535500:
- '*':
reason: None Given
SNYK-JS-NODESASS-535502:
- '*':
reason: None Given
SNYK-JS-NODESASS-540956:
- '*':
reason: Non given
SNYK-JS-NODESASS-540958:
- '*':
reason: Non given
SNYK-JS-NODESASS-540964:
- '*':
reason: Non given
SNYK-JS-NODESASS-540978:
- '*':
reason: Non given
SNYK-JS-NODESASS-540980:
- '*':
reason: Non given
SNYK-JS-NODESASS-540990:
- '*':
reason: Non given
SNYK-JS-NODESASS-540992:
- '*':
reason: Non given
SNYK-JS-NODESASS-540994:
- '*':
reason: Non given
SNYK-JS-NODESASS-540996:
- '*':
reason: Non given
SNYK-JS-NODESASS-540998:
- '*':
reason: Non given
SNYK-JS-NODESASS-541000:
- '*':
reason: Non given
SNYK-JS-NODESASS-541002:
- '*':
reason: Non given
SNYK-JS-NTHCHECK-1586032:
- '*':
reason: Non given
SNYK-JS-PATHPARSE-1077067:
- '*':
reason: Non given
SNYK-JS-POSTCSS-1090595:
- '*':
reason: Non given
SNYK-JS-POSTCSS-1255640:
- '*':
reason: Non given
SNYK-JS-PRISMJS-1314893:
- '*':
reason: Non given
SNYK-JS-PRISMJS-1585202:
- '*':
reason: Non given
SNYK-JS-PROMPTS-1729737:
- '*':
reason: Non given
SNYK-JS-SHELLQUOTE-1766506:
- '*':
reason: Non given
SNYK-JS-TAR-1536528:
- '*':
reason: Non given
SNYK-JS-TAR-1536531:
- '*':
reason: Non given
SNYK-JS-TAR-1536758:
- '*':
reason: Non given
SNYK-JS-TAR-1579147:
- '*':
reason: Non given
SNYK-JS-TAR-1579152:
- '*':
reason: Non given
SNYK-JS-TAR-1579155:
- '*':
reason: Non given
SNYK-JS-TMPL-1583443:
- '*':
reason: Non given
SNYK-JS-URLPARSE-1533425:
- '*':
reason: Non given
SNYK-JS-WS-1296835:
- '*':
reason: Non given

View File

@@ -205,7 +205,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({titl
<tbody>
{arrayToIterate.map(({rule, matched}, index) => {
return (
<EntryPolicySectionContainer key={index} label={rule.Name} matched={matched && (rule.Type === 'latency' ? rule.Latency >= latency : true)? "Success" : "Failure"}>
<EntryPolicySectionContainer key={index} label={rule.Name} matched={matched && (rule.Type === 'slo' ? rule.ResponseTime >= latency : true)? "Success" : "Failure"}>
{
<>
{
@@ -213,8 +213,8 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({titl
<tr className={styles.dataValue}><td><b>Key:</b></td> <td>{rule.Key}</td></tr>
}
{
rule.Latency !== 0 &&
<tr className={styles.dataValue}><td><b>Latency:</b></td> <td>{rule.Latency}</td></tr>
rule.ResponseTime !== 0 &&
<tr className={styles.dataValue}><td><b>Response Time:</b></td> <td>{rule.ResponseTime}</td></tr>
}
{
rule.Method &&