mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-17 03:19:54 +00:00
Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35dbd5fde2 | ||
|
|
2c29903910 | ||
|
|
f49e29045c | ||
|
|
67593345a7 | ||
|
|
f069fdaae1 | ||
|
|
d478a7ec43 | ||
|
|
af96e55f61 | ||
|
|
e3ead981ec | ||
|
|
fbee4454e4 | ||
|
|
e9e16551ad | ||
|
|
e635b97d11 | ||
|
|
779257b864 | ||
|
|
e696851261 | ||
|
|
4e50e17d81 | ||
|
|
991eb2ab16 | ||
|
|
f0db3b81a8 | ||
|
|
9df1812d8e | ||
|
|
4f6da91d74 | ||
|
|
e2e69a3dc4 | ||
|
|
b6db64d868 | ||
|
|
160ae77145 | ||
|
|
2944493e2d |
4
.github/workflows/publish.yml
vendored
4
.github/workflows/publish.yml
vendored
@@ -14,6 +14,10 @@ jobs:
|
||||
docker:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.16'
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
- name: Set up Cloud SDK
|
||||
|
||||
25
.github/workflows/security_validation.yml
vendored
Normal file
25
.github/workflows/security_validation.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
name: Security validation
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- 'develop'
|
||||
- 'main'
|
||||
|
||||
jobs:
|
||||
security:
|
||||
name: Check for vulnerabilities
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- uses: snyk/actions/setup@master
|
||||
- name: Set up Go 1.16
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.16'
|
||||
|
||||
- name: Run snyl on all projects
|
||||
run: snyk test --all-projects
|
||||
@@ -44,7 +44,7 @@ RUN go build -ldflags="-s -w \
|
||||
COPY devops/build_extensions.sh ..
|
||||
RUN cd .. && /bin/bash build_extensions.sh
|
||||
|
||||
FROM alpine:3.13.5
|
||||
FROM alpine:3.14
|
||||
|
||||
RUN apk add bash libpcap-dev tcpdump
|
||||
WORKDIR /app
|
||||
|
||||
4
Makefile
4
Makefile
@@ -46,6 +46,10 @@ push-docker: ## Build and publish agent docker image.
|
||||
@echo "publishing Docker image .. "
|
||||
devops/build-push-featurebranch.sh
|
||||
|
||||
push-docker-debug:
|
||||
@echo "publishing debug Docker image .. "
|
||||
devops/build-push-featurebranch-debug.sh
|
||||
|
||||
build-docker-ci: ## Build agent docker image for CI.
|
||||
@echo "building docker image for ci"
|
||||
devops/build-agent-ci.sh
|
||||
|
||||
14
README.md
14
README.md
@@ -15,6 +15,10 @@ Think TCPDump and Chrome Dev Tools combined.
|
||||
- No installation or code instrumentation
|
||||
- Works completely on premises
|
||||
|
||||
## Requirements
|
||||
|
||||
A Kubernetes server version of 1.16.0 or higher is required.
|
||||
|
||||
## Download
|
||||
|
||||
Download Mizu for your platform and operating system
|
||||
@@ -167,6 +171,16 @@ against the contracts.
|
||||
|
||||
Please see [CONTRACT MONITORING](docs/CONTRACT_MONITORING.md) page for more details and syntax.
|
||||
|
||||
### Configure proxy host
|
||||
|
||||
By default, mizu will be accessible via local host: 'http://localhost:8899/mizu/', it is possible to change the host,
|
||||
for instance, to '0.0.0.0' which can grant access via machine IP address.
|
||||
This setting can be changed via command line flag `--set tap.proxy-host=<value>` or via config file:
|
||||
tap
|
||||
proxy-host: 0.0.0.0
|
||||
and when changed it will support accessing by IP
|
||||
|
||||
|
||||
## How to Run local UI
|
||||
|
||||
- run from mizu/agent `go run main.go --hars-read --hars-dir <folder>`
|
||||
|
||||
6
agent/.snyk
Normal file
6
agent/.snyk
Normal file
@@ -0,0 +1,6 @@
|
||||
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
|
||||
version: v1.14.0
|
||||
ignore:
|
||||
SNYK-GOLANG-GITHUBCOMGINGONICGIN-1041736:
|
||||
- '*':
|
||||
reason: None Given
|
||||
@@ -79,6 +79,8 @@ github.com/gin-contrib/static v0.0.1/go.mod h1:CSxeF+wep05e0kCOsqWdAWbSszmc31zTI
|
||||
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
|
||||
github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA=
|
||||
github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
|
||||
github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg=
|
||||
github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
||||
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
|
||||
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
|
||||
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/config"
|
||||
"mizuserver/pkg/controllers"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/routes"
|
||||
@@ -44,6 +45,9 @@ func main() {
|
||||
logLevel := determineLogLevel()
|
||||
logger.InitLoggerStderrOnly(logLevel)
|
||||
flag.Parse()
|
||||
if err := config.LoadConfig(); err != nil {
|
||||
logger.Log.Fatalf("Error loading config file %v", err)
|
||||
}
|
||||
loadExtensions()
|
||||
|
||||
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
|
||||
@@ -313,3 +317,4 @@ func determineLogLevel() (logLevel logging.Level) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ const (
|
||||
)
|
||||
|
||||
func loadOAS(ctx context.Context) (doc *openapi3.T, contractContent string, router routers.Router, err error) {
|
||||
path := fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.ContractFileName)
|
||||
path := fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ContractFileName)
|
||||
bytes, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
logger.Log.Error(err.Error())
|
||||
|
||||
57
agent/pkg/config/config.go
Normal file
57
agent/pkg/config/config.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
// these values are used when the config.json file is not present
|
||||
const (
|
||||
defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
|
||||
defaultRegexTarget string = ".*"
|
||||
)
|
||||
|
||||
var Config *shared.MizuAgentConfig
|
||||
|
||||
func LoadConfig() error {
|
||||
if Config != nil {
|
||||
return nil
|
||||
}
|
||||
filePath := fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ConfigFileName)
|
||||
|
||||
content, err := ioutil.ReadFile(filePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return applyDefaultConfig()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal(content, &Config); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func applyDefaultConfig() error {
|
||||
defaultConfig, err := getDefaultConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Config = defaultConfig
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDefaultConfig() (*shared.MizuAgentConfig, error) {
|
||||
regex, err := shared.CompileRegexToSerializableRegexp(defaultRegexTarget)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &shared.MizuAgentConfig{
|
||||
TapTargetRegex: *regex,
|
||||
MaxDBSizeBytes: defaultMaxDatabaseSizeBytes,
|
||||
}, nil
|
||||
}
|
||||
@@ -13,11 +13,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DBPath = "./entries.db"
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
DBPath = "./entries.db"
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
TimeFormat = "2006-01-02 15:04:05.000000000"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -57,7 +58,7 @@ func initDataBase(databasePath string) *gorm.DB {
|
||||
return temp
|
||||
}
|
||||
|
||||
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
|
||||
func GetEntriesFromDb(timeFrom time.Time, timeTo time.Time, protocolName *string) []tapApi.MizuEntry {
|
||||
order := OrderDesc
|
||||
protocolNameCondition := "1 = 1"
|
||||
if protocolName != nil {
|
||||
@@ -67,7 +68,7 @@ func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *stri
|
||||
var entries []tapApi.MizuEntry
|
||||
GetEntriesTable().
|
||||
Where(protocolNameCondition).
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Where(fmt.Sprintf("created_at BETWEEN '%s' AND '%s'", timeFrom.Format(TimeFormat), timeTo.Format(TimeFormat))).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"mizuserver/pkg/config"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/shared/units"
|
||||
@@ -14,7 +13,6 @@ import (
|
||||
)
|
||||
|
||||
const percentageOfMaxSizeBytesToPrune = 15
|
||||
const defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
|
||||
|
||||
func StartEnforcingDatabaseSize() {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
@@ -23,14 +21,8 @@ func StartEnforcingDatabaseSize() {
|
||||
return
|
||||
}
|
||||
|
||||
maxEntriesDBByteSize, err := getMaxEntriesDBByteSize()
|
||||
if err != nil {
|
||||
logger.Log.Fatalf("Error parsing max db size: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
checkFileSizeDebouncer := debounce.NewDebouncer(5*time.Second, func() {
|
||||
checkFileSize(maxEntriesDBByteSize)
|
||||
checkFileSize(config.Config.MaxDBSizeBytes)
|
||||
})
|
||||
|
||||
go func() {
|
||||
@@ -58,17 +50,6 @@ func StartEnforcingDatabaseSize() {
|
||||
}
|
||||
}
|
||||
|
||||
func getMaxEntriesDBByteSize() (int64, error) {
|
||||
maxEntriesDBByteSize := defaultMaxDatabaseSizeBytes
|
||||
var err error
|
||||
|
||||
maxEntriesDBSizeByteSEnvVarValue := os.Getenv(shared.MaxEntriesDBSizeBytesEnvVar)
|
||||
if maxEntriesDBSizeByteSEnvVarValue != "" {
|
||||
maxEntriesDBByteSize, err = strconv.ParseInt(maxEntriesDBSizeByteSEnvVarValue, 10, 64)
|
||||
}
|
||||
return maxEntriesDBByteSize, err
|
||||
}
|
||||
|
||||
func checkFileSize(maxSizeBytes int64) {
|
||||
fileStat, err := os.Stat(DBPath)
|
||||
if err != nil {
|
||||
|
||||
@@ -140,6 +140,13 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
|
||||
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
|
||||
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
if service.Spec.Ports != nil {
|
||||
for _, port := range service.Spec.Ports {
|
||||
if port.Port > 0 {
|
||||
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
}
|
||||
if service.Status.LoadBalancer.Ingress != nil {
|
||||
|
||||
@@ -45,7 +45,7 @@ func ValidateService(serviceFromRule string, service string) bool {
|
||||
}
|
||||
|
||||
func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend []RulesMatched, isEnabled bool) {
|
||||
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s/%s", shared.RulePolicyPath, shared.RulePolicyFileName))
|
||||
enforcePolicy, err := shared.DecodeEnforcePolicy(fmt.Sprintf("%s%s", shared.ConfigDirPath, shared.ValidationRulesFileName))
|
||||
if err == nil && len(enforcePolicy.Rules) > 0 {
|
||||
isEnabled = true
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func PassedValidationRules(rulesMatched []RulesMatched) (bool, int64, int) {
|
||||
if rule.Matched == false {
|
||||
return false, responseTime, numberOfRulesMatched
|
||||
} else {
|
||||
if strings.ToLower(rule.Rule.Type) == "responseTime" {
|
||||
if strings.ToLower(rule.Rule.Type) == "slo" {
|
||||
if rule.Rule.ResponseTime < responseTime || responseTime == -1 {
|
||||
responseTime = rule.Rule.ResponseTime
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package up9
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -205,13 +206,13 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
|
||||
sleepTime := time.Second * time.Duration(uploadIntervalSec)
|
||||
|
||||
var timestampFrom int64 = 0
|
||||
var timeFrom time.Time
|
||||
protocolFilter := "http"
|
||||
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
logger.Log.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
protocolFilter := "http"
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
|
||||
timeTo := time.Now()
|
||||
logger.Log.Infof("Getting entries from %v, to %v\n", timeFrom.Format(time.RFC3339Nano), timeTo.Format(time.RFC3339Nano))
|
||||
entriesArray := database.GetEntriesFromDb(timeFrom, timeTo, &protocolFilter)
|
||||
|
||||
if len(entriesArray) > 0 {
|
||||
result := make([]har.Entry, 0)
|
||||
@@ -231,6 +232,12 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: data.ResolvedDestination})
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, data.ResolvedDestination)
|
||||
}
|
||||
|
||||
// go's default marshal behavior is to encode []byte fields to base64, python's default unmarshal behavior is to not decode []byte fields from base64
|
||||
if harEntry.Response.Content.Text, err = base64.StdEncoding.DecodeString(string(harEntry.Response.Content.Text)); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, *harEntry)
|
||||
}
|
||||
|
||||
@@ -269,13 +276,14 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
analyzeInformation.SentCount += len(entriesArray)
|
||||
logger.Log.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
|
||||
|
||||
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)
|
||||
} else {
|
||||
logger.Log.Infof("Nothing to upload")
|
||||
}
|
||||
|
||||
logger.Log.Infof("Sleeping for %v...\n", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
timestampFrom = timestampTo
|
||||
timeFrom = timeTo
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,8 +27,9 @@ build-all: ## Build for all supported platforms.
|
||||
@mkdir -p bin && sed s/_SEM_VER_/$(SEM_VER)/g README.md.TEMPLATE > bin/README.md
|
||||
@$(MAKE) build GOOS=darwin GOARCH=amd64
|
||||
@$(MAKE) build GOOS=linux GOARCH=amd64
|
||||
@# $(MAKE) build GOOS=darwin GOARCH=arm64
|
||||
@# $(MAKE) GOOS=windows GOARCH=amd64
|
||||
@$(MAKE) build GOOS=darwin GOARCH=arm64
|
||||
@$(MAKE) build GOOS=windows GOARCH=amd64
|
||||
@mv ./bin/mizu_windows_amd64 ./bin/mizu.exe
|
||||
@# $(MAKE) GOOS=linux GOARCH=386
|
||||
@# $(MAKE) GOOS=windows GOARCH=386
|
||||
@# $(MAKE) GOOS=linux GOARCH=arm64
|
||||
|
||||
@@ -2,16 +2,25 @@
|
||||
|
||||
Download Mizu for your platform
|
||||
|
||||
**Mac** (on Intel chip)
|
||||
**Mac** (Intel)
|
||||
```
|
||||
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_darwin_amd64 && chmod 755 mizu
|
||||
```
|
||||
|
||||
**Mac** (Apple M1 silicon)
|
||||
```
|
||||
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_darwin_arm64 && chmod 755 mizu
|
||||
```
|
||||
|
||||
**Linux**
|
||||
```
|
||||
curl -Lo mizu https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu_linux_amd64 && chmod 755 mizu
|
||||
```
|
||||
|
||||
**Windows** (Intel 64bit)
|
||||
```
|
||||
curl -LO https://github.com/up9inc/mizu/releases/download/_SEM_VER_/mizu.exe
|
||||
```
|
||||
|
||||
### Checksums
|
||||
SHA256 checksums available for compiled binaries.
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -33,19 +32,8 @@ func Login() error {
|
||||
Token: token.AccessToken,
|
||||
}
|
||||
|
||||
configFile, defaultConfigErr := config.GetConfigWithDefaults()
|
||||
if defaultConfigErr != nil {
|
||||
return fmt.Errorf("failed getting config with defaults, err: %v", defaultConfigErr)
|
||||
}
|
||||
|
||||
if err := config.LoadConfigFile(config.Config.ConfigFilePath, configFile); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed getting config file, err: %v", err)
|
||||
}
|
||||
|
||||
configFile.Auth = authConfig
|
||||
|
||||
if err := config.WriteConfig(configFile); err != nil {
|
||||
return fmt.Errorf("failed writing config with auth, err: %v", err)
|
||||
if err := config.UpdateConfig(func(configStruct *config.ConfigStruct) { configStruct.Auth = authConfig }); err != nil {
|
||||
return fmt.Errorf("failed updating config with auth, err: %v", err)
|
||||
}
|
||||
|
||||
config.Config.Auth = authConfig
|
||||
|
||||
20
cli/cmd/clean.go
Normal file
20
cli/cmd/clean.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
)
|
||||
|
||||
var cleanCmd = &cobra.Command{
|
||||
Use: "clean",
|
||||
Short: "Removes all mizu resources",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("clean", nil)
|
||||
performCleanCommand()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(cleanCmd)
|
||||
}
|
||||
17
cli/cmd/cleanRunner.go
Normal file
17
cli/cmd/cleanRunner.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func performCleanCommand() {
|
||||
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
|
||||
if err != nil {
|
||||
logger.Log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
finishMizuExecution(kubernetesProvider)
|
||||
}
|
||||
@@ -21,7 +21,7 @@ func GetApiServerUrl() string {
|
||||
}
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
|
||||
@@ -80,10 +80,19 @@ Supported protocols are HTTP and gRPC.`,
|
||||
|
||||
func askConfirmation(flagName string) {
|
||||
logger.Log.Infof(fmt.Sprintf(uploadTrafficMessageToConfirm, flagName))
|
||||
|
||||
if !config.Config.Tap.AskUploadConfirmation {
|
||||
return
|
||||
}
|
||||
|
||||
if !uiUtils.AskForConfirmation("Would you like to proceed [Y/n]: ") {
|
||||
logger.Log.Infof("You can always run mizu without %s, aborting", flagName)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if err := config.UpdateConfig(func(configStruct *config.ConfigStruct) { configStruct.Tap.AskUploadConfirmation = false }); err != nil {
|
||||
logger.Log.Debugf("failed updating config with upload confirmation, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -2,8 +2,11 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
@@ -51,9 +54,9 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
|
||||
var mizuValidationRules string
|
||||
var serializedValidationRules string
|
||||
if config.Config.Tap.EnforcePolicyFile != "" {
|
||||
mizuValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
||||
serializedValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
@@ -61,14 +64,14 @@ func RunMizuTap() {
|
||||
}
|
||||
|
||||
// Read and validate the OAS file
|
||||
var contract string
|
||||
var serializedContract string
|
||||
if config.Config.Tap.ContractFile != "" {
|
||||
bytes, err := ioutil.ReadFile(config.Config.Tap.ContractFile)
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading contract file: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
contract = string(bytes)
|
||||
serializedContract = string(bytes)
|
||||
|
||||
ctx := context.Background()
|
||||
loader := &openapi3.Loader{Context: ctx}
|
||||
@@ -84,6 +87,12 @@ func RunMizuTap() {
|
||||
}
|
||||
}
|
||||
|
||||
serializedMizuConfig, err := config.GetSerializedMizuConfig()
|
||||
if err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath())
|
||||
if err != nil {
|
||||
logger.Log.Error(err)
|
||||
@@ -129,11 +138,18 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
|
||||
defer finishMizuExecution(kubernetesProvider)
|
||||
if err := createMizuResources(ctx, kubernetesProvider, mizuValidationRules, contract); err != nil {
|
||||
if err := createMizuResources(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) {
|
||||
if statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists {
|
||||
logger.Log.Info("Mizu is already running in this namespace, change the `mizu-resources-namespace` configuration or run `mizu clean` to remove the currently running Mizu instance")
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
defer finishMizuExecution(kubernetesProvider)
|
||||
|
||||
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
|
||||
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
|
||||
@@ -152,7 +168,7 @@ func readValidationRules(file string) (string, error) {
|
||||
return string(newContent), nil
|
||||
}
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuValidationRules string, contract string) error {
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
||||
if !config.Config.IsNsRestrictedMode() {
|
||||
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
||||
return err
|
||||
@@ -163,15 +179,15 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
return err
|
||||
}
|
||||
|
||||
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules, contract); err != nil {
|
||||
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
||||
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, data string, contract string) error {
|
||||
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, data, contract)
|
||||
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
||||
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, mizu.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
@@ -40,7 +41,7 @@ func InitConfig(cmd *cobra.Command) error {
|
||||
|
||||
configFilePathFlag := cmd.Flags().Lookup(ConfigFilePathCommandName)
|
||||
configFilePath := configFilePathFlag.Value.String()
|
||||
if err := LoadConfigFile(configFilePath, &Config); err != nil {
|
||||
if err := loadConfigFile(configFilePath, &Config); err != nil {
|
||||
if configFilePathFlag.Changed || !os.IsNotExist(err) {
|
||||
return fmt.Errorf("invalid config, %w\n"+
|
||||
"you can regenerate the file by removing it (%v) and using `mizu config -r`", err, configFilePath)
|
||||
@@ -81,7 +82,27 @@ func WriteConfig(config *ConfigStruct) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func LoadConfigFile(configFilePath string, config *ConfigStruct) error {
|
||||
type updateConfigStruct func(*ConfigStruct)
|
||||
func UpdateConfig(updateConfigStruct updateConfigStruct) error {
|
||||
configFile, err := GetConfigWithDefaults()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed getting config with defaults, err: %v", err)
|
||||
}
|
||||
|
||||
if err := loadConfigFile(Config.ConfigFilePath, configFile); err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed getting config file, err: %v", err)
|
||||
}
|
||||
|
||||
updateConfigStruct(configFile)
|
||||
|
||||
if err := WriteConfig(configFile); err != nil {
|
||||
return fmt.Errorf("failed writing config, err: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadConfigFile(configFilePath string, config *ConfigStruct) error {
|
||||
reader, openErr := os.Open(configFilePath)
|
||||
if openErr != nil {
|
||||
return openErr
|
||||
@@ -343,3 +364,27 @@ func setZeroForReadonlyFields(currentElem reflect.Value) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetSerializedMizuConfig() (string, error) {
|
||||
mizuConfig, err := getMizuConfig()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
serializedConfig, err := json.Marshal(mizuConfig)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(serializedConfig), nil
|
||||
}
|
||||
|
||||
func getMizuConfig() (*shared.MizuAgentConfig, error) {
|
||||
serializableRegex, err := shared.CompileRegexToSerializableRegexp(Config.Tap.PodRegexStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config := shared.MizuAgentConfig{
|
||||
TapTargetRegex: *serializableRegex,
|
||||
MaxDBSizeBytes: Config.Tap.MaxEntriesDBSizeBytes(),
|
||||
}
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
const (
|
||||
MizuResourcesNamespaceConfigName = "mizu-resources-namespace"
|
||||
ConfigFilePathCommandName = "config-path"
|
||||
KubeConfigPathConfigName = "kube-config-path"
|
||||
)
|
||||
|
||||
type ConfigStruct struct {
|
||||
|
||||
@@ -26,6 +26,7 @@ type TapConfig struct {
|
||||
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
|
||||
PodRegexStr string `yaml:"regex" default:".*"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
ProxyHost string `yaml:"proxy-host" default:"127.0.0.1"`
|
||||
Namespaces []string `yaml:"namespaces"`
|
||||
Analysis bool `yaml:"analysis" default:"false"`
|
||||
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
||||
@@ -37,6 +38,7 @@ type TapConfig struct {
|
||||
Workspace string `yaml:"workspace"`
|
||||
EnforcePolicyFile string `yaml:"traffic-validation-file"`
|
||||
ContractFile string `yaml:"contract"`
|
||||
AskUploadConfirmation bool `yaml:"ask-upload-confirmation" default:"true"`
|
||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||
TapperResources Resources `yaml:"tapper-resources"`
|
||||
}
|
||||
|
||||
@@ -7,12 +7,13 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/up9inc/mizu/cli/config/configStructs"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/shared/semver"
|
||||
"k8s.io/apimachinery/pkg/version"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
|
||||
"io"
|
||||
|
||||
@@ -77,6 +78,14 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
|
||||
"you can set alternative kube config file path by adding the kube-config-path field to the mizu config file, err: %w", kubeConfigPath, err)
|
||||
}
|
||||
|
||||
if err := validateNotProxy(kubernetesConfig, restClientConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := validateKubernetesVersion(clientSet); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Provider{
|
||||
clientSet: clientSet,
|
||||
kubernetesConfig: kubernetesConfig,
|
||||
@@ -167,10 +176,8 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
}
|
||||
}
|
||||
|
||||
configMapVolumeName := &core.ConfigMapVolumeSource{}
|
||||
configMapVolumeName.Name = mizu.ConfigMapName
|
||||
configMapOptional := true
|
||||
configMapVolumeName.Optional = &configMapOptional
|
||||
configMapVolume := &core.ConfigMapVolumeSource{}
|
||||
configMapVolume.Name = mizu.ConfigMapName
|
||||
|
||||
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
|
||||
if err != nil {
|
||||
@@ -216,7 +223,7 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
VolumeMounts: []core.VolumeMount{
|
||||
{
|
||||
Name: mizu.ConfigMapName,
|
||||
MountPath: shared.RulePolicyPath,
|
||||
MountPath: shared.ConfigDirPath,
|
||||
},
|
||||
},
|
||||
Command: command,
|
||||
@@ -225,10 +232,6 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
Name: shared.SyncEntriesConfigEnvVar,
|
||||
Value: string(marshaledSyncEntriesConfig),
|
||||
},
|
||||
{
|
||||
Name: shared.MaxEntriesDBSizeBytesEnvVar,
|
||||
Value: strconv.FormatInt(opts.MaxEntriesDBSizeBytes, 10),
|
||||
},
|
||||
{
|
||||
Name: shared.DebugModeEnvVar,
|
||||
Value: debugMode,
|
||||
@@ -269,7 +272,7 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
{
|
||||
Name: mizu.ConfigMapName,
|
||||
VolumeSource: core.VolumeSource{
|
||||
ConfigMap: configMapVolumeName,
|
||||
ConfigMap: configMapVolume,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -485,14 +488,16 @@ func (provider *Provider) handleRemovalError(err error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, data string, contract string) error {
|
||||
if data == "" && contract == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
||||
configMapData := make(map[string]string, 0)
|
||||
configMapData[shared.RulePolicyFileName] = data
|
||||
configMapData[shared.ContractFileName] = contract
|
||||
if serializedValidationRules != "" {
|
||||
configMapData[shared.ValidationRulesFileName] = serializedValidationRules
|
||||
}
|
||||
if serializedContract != "" {
|
||||
configMapData[shared.ContractFileName] = serializedContract
|
||||
}
|
||||
configMapData[shared.ConfigFileName] = serializedMizuConfig
|
||||
|
||||
configMap := &core.ConfigMap{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "ConfigMap",
|
||||
@@ -611,6 +616,24 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
noScheduleToleration.WithOperator(core.TolerationOpExists)
|
||||
noScheduleToleration.WithEffect(core.TaintEffectNoSchedule)
|
||||
|
||||
volumeName := mizu.ConfigMapName
|
||||
configMapVolume := applyconfcore.VolumeApplyConfiguration{
|
||||
Name: &volumeName,
|
||||
VolumeSourceApplyConfiguration: applyconfcore.VolumeSourceApplyConfiguration{
|
||||
ConfigMap: &applyconfcore.ConfigMapVolumeSourceApplyConfiguration{
|
||||
LocalObjectReferenceApplyConfiguration: applyconfcore.LocalObjectReferenceApplyConfiguration{
|
||||
Name: &volumeName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
mountPath := shared.ConfigDirPath
|
||||
configMapVolumeMount := applyconfcore.VolumeMountApplyConfiguration{
|
||||
Name: &volumeName,
|
||||
MountPath: &mountPath,
|
||||
}
|
||||
agentContainer.WithVolumeMounts(&configMapVolumeMount)
|
||||
|
||||
podSpec := applyconfcore.PodSpec()
|
||||
podSpec.WithHostNetwork(true)
|
||||
podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet)
|
||||
@@ -621,6 +644,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
podSpec.WithContainers(agentContainer)
|
||||
podSpec.WithAffinity(affinity)
|
||||
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
|
||||
podSpec.WithVolumes(&configMapVolume)
|
||||
|
||||
podTemplate := applyconfcore.PodTemplateSpec()
|
||||
podTemplate.WithLabels(map[string]string{"app": tapperPodName})
|
||||
@@ -727,3 +751,47 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
|
||||
func isPodRunning(pod *core.Pod) bool {
|
||||
return pod.Status.Phase == core.PodRunning
|
||||
}
|
||||
|
||||
// We added this after a customer tried to run mizu from lens, which used len's kube config, which have cluster server configuration, which points to len's local proxy.
|
||||
// The workaround was to use the user's local default kube config.
|
||||
// For now - we are blocking the option to run mizu through a proxy to k8s server
|
||||
func validateNotProxy(kubernetesConfig clientcmd.ClientConfig, restClientConfig *restclient.Config) error {
|
||||
kubernetesUrl, err := url.Parse(restClientConfig.Host)
|
||||
if err != nil {
|
||||
logger.Log.Debugf("validateNotProxy - error while parsing kubernetes host, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
restProxyClientConfig, _ := kubernetesConfig.ClientConfig()
|
||||
restProxyClientConfig.Host = kubernetesUrl.Host
|
||||
|
||||
clientProxySet, err := getClientSet(restProxyClientConfig)
|
||||
if err == nil {
|
||||
proxyServerVersion, err := clientProxySet.ServerVersion()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if *proxyServerVersion == (version.Info{}) {
|
||||
return fmt.Errorf("cannot establish http-proxy connection to the Kubernetes cluster. If you’re using Lens or similar tool, please run mizu with regular kubectl config using --%v %v=$HOME/.kube/config flag", config.SetCommandName, config.KubeConfigPathConfigName)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateKubernetesVersion(clientSet *kubernetes.Clientset) error {
|
||||
serverVersion, err := clientSet.ServerVersion()
|
||||
if err != nil {
|
||||
logger.Log.Debugf("error while getting kubernetes server version, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
serverVersionSemVer := semver.SemVersion(serverVersion.GitVersion)
|
||||
minKubernetesServerVersionSemVer := semver.SemVersion(mizu.MinKubernetesServerVersion)
|
||||
if minKubernetesServerVersionSemVer.GreaterThan(serverVersionSemVer) {
|
||||
return fmt.Errorf("kubernetes server version %v is not supported, supporting only kubernetes server version of %v or higher", serverVersion.GitVersion, mizu.MinKubernetesServerVersion)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -14,12 +14,12 @@ import (
|
||||
const k8sProxyApiPrefix = "/"
|
||||
const mizuServicePort = 80
|
||||
|
||||
func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
|
||||
func StartProxy(kubernetesProvider *Provider, proxyHost string, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
|
||||
logger.Log.Debugf("Starting proxy. namespace: [%v], service name: [%s], port: [%v]", mizuNamespace, mizuServiceName, mizuPort)
|
||||
filter := &proxy.FilterServer{
|
||||
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
|
||||
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),
|
||||
AcceptHosts: proxy.MakeRegexpArrayOrDie(proxy.DefaultHostAcceptRE),
|
||||
AcceptHosts: proxy.MakeRegexpArrayOrDie("^.*"),
|
||||
RejectMethods: proxy.MakeRegexpArrayOrDie(proxy.DefaultMethodRejectRE),
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace str
|
||||
mux.Handle("/static/", getRerouteHttpHandlerMizuStatic(proxyHandler, mizuNamespace, mizuServiceName))
|
||||
mux.Handle("/mizu/", getRerouteHttpHandlerMizuAPI(proxyHandler, mizuNamespace, mizuServiceName))
|
||||
|
||||
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", int(mizuPort)))
|
||||
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", proxyHost, int(mizuPort)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -14,17 +14,18 @@ var (
|
||||
)
|
||||
|
||||
const (
|
||||
MizuResourcesPrefix = "mizu-"
|
||||
ApiServerPodName = MizuResourcesPrefix + "api-server"
|
||||
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
|
||||
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
|
||||
K8sAllNamespaces = ""
|
||||
RoleBindingName = MizuResourcesPrefix + "role-binding"
|
||||
RoleName = MizuResourcesPrefix + "role"
|
||||
ServiceAccountName = MizuResourcesPrefix + "service-account"
|
||||
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
|
||||
TapperPodName = MizuResourcesPrefix + "tapper"
|
||||
ConfigMapName = MizuResourcesPrefix + "policy"
|
||||
MizuResourcesPrefix = "mizu-"
|
||||
ApiServerPodName = MizuResourcesPrefix + "api-server"
|
||||
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
|
||||
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
|
||||
K8sAllNamespaces = ""
|
||||
RoleBindingName = MizuResourcesPrefix + "role-binding"
|
||||
RoleName = MizuResourcesPrefix + "role"
|
||||
ServiceAccountName = MizuResourcesPrefix + "service-account"
|
||||
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
|
||||
TapperPodName = MizuResourcesPrefix + "tapper"
|
||||
ConfigMapName = MizuResourcesPrefix + "config"
|
||||
MinKubernetesServerVersion = "1.16.0"
|
||||
)
|
||||
|
||||
func GetMizuFolderPath() string {
|
||||
|
||||
@@ -85,7 +85,13 @@ func CheckNewerVersion(versionChan chan string) {
|
||||
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
|
||||
|
||||
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS)
|
||||
var downloadMessage string
|
||||
if runtime.GOOS == "windows" {
|
||||
downloadMessage = fmt.Sprintf("curl -LO %v/mizu.exe", strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1))
|
||||
} else {
|
||||
downloadMessage = fmt.Sprintf("curl -Lo mizu %v/mizu_%s_%s && chmod 755 mizu", strings.Replace(*latestRelease.HTMLURL, "tag", "download", 1), runtime.GOOS, runtime.GOARCH)
|
||||
}
|
||||
versionChan <- fmt.Sprintf("Update available! %v -> %v (%s)", mizu.SemVer, gitHubVersion, downloadMessage)
|
||||
} else {
|
||||
versionChan <- ""
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ FROM golang:1.16-alpine AS builder
|
||||
# Set necessary environment variables needed for our image.
|
||||
ENV CGO_ENABLED=1 GOOS=linux GOARCH=amd64
|
||||
|
||||
RUN apk add libpcap-dev gcc g++ make
|
||||
RUN apk add libpcap-dev gcc g++ make bash
|
||||
|
||||
# Move to agent working directory (/agent-build).
|
||||
WORKDIR /app/agent-build
|
||||
@@ -20,17 +20,25 @@ WORKDIR /app/agent-build
|
||||
COPY agent/go.mod agent/go.sum ./
|
||||
COPY shared/go.mod shared/go.mod ../shared/
|
||||
COPY tap/go.mod tap/go.mod ../tap/
|
||||
|
||||
COPY tap/api/go.* ../tap/api/
|
||||
RUN go mod download
|
||||
# cheap trick to make the build faster (As long as go.mod wasn't changes)
|
||||
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
|
||||
|
||||
ARG COMMIT_HASH
|
||||
ARG GIT_BRANCH
|
||||
ARG BUILD_TIMESTAMP
|
||||
ARG SEM_VER
|
||||
|
||||
# Copy and build agent code
|
||||
COPY shared ../shared
|
||||
COPY tap ../tap
|
||||
COPY agent .
|
||||
RUN go build -gcflags="all=-N -l" -o mizuagent .
|
||||
|
||||
COPY devops/build_extensions_debug.sh ..
|
||||
RUN cd .. && /bin/bash build_extensions_debug.sh
|
||||
|
||||
|
||||
FROM golang:1.16-alpine
|
||||
|
||||
@@ -39,6 +47,7 @@ WORKDIR /app
|
||||
|
||||
# Copy binary and config files from /build to root folder of scratch container.
|
||||
COPY --from=builder ["/app/agent-build/mizuagent", "."]
|
||||
COPY --from=builder ["/app/agent/build/extensions", "extensions"]
|
||||
COPY --from=site-build ["/app/ui-build/build", "site"]
|
||||
|
||||
# install remote debugging tool
|
||||
|
||||
28
devops/build-push-featurebranch-debug.sh
Executable file
28
devops/build-push-featurebranch-debug.sh
Executable file
@@ -0,0 +1,28 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
GCP_PROJECT=up9-docker-hub
|
||||
REPOSITORY=gcr.io/$GCP_PROJECT
|
||||
SERVER_NAME=mizu
|
||||
GIT_BRANCH=$(git branch | grep \* | cut -d ' ' -f2 | tr '[:upper:]' '[:lower:]')
|
||||
|
||||
DOCKER_REPO=$REPOSITORY/$SERVER_NAME/$GIT_BRANCH
|
||||
SEM_VER=${SEM_VER=0.0.0}
|
||||
|
||||
DOCKER_TAGGED_BUILDS=("$DOCKER_REPO:latest" "$DOCKER_REPO:$SEM_VER")
|
||||
|
||||
if [ "$GIT_BRANCH" = 'develop' -o "$GIT_BRANCH" = 'master' -o "$GIT_BRANCH" = 'main' ]
|
||||
then
|
||||
echo "Pushing to $GIT_BRANCH is allowed only via CI"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "building ${DOCKER_TAGGED_BUILDS[@]}"
|
||||
DOCKER_TAGS_ARGS=$(echo ${DOCKER_TAGGED_BUILDS[@]/#/-t }) # "-t FIRST_TAG -t SECOND_TAG ..."
|
||||
docker build -f debug.Dockerfile $DOCKER_TAGS_ARGS --build-arg SEM_VER=${SEM_VER} --build-arg BUILD_TIMESTAMP=${BUILD_TIMESTAMP} --build-arg GIT_BRANCH=${GIT_BRANCH} --build-arg COMMIT_HASH=${COMMIT_HASH} .
|
||||
|
||||
for DOCKER_TAG in "${DOCKER_TAGGED_BUILDS[@]}"
|
||||
do
|
||||
echo pushing "$DOCKER_TAG"
|
||||
docker push "$DOCKER_TAG"
|
||||
done
|
||||
12
devops/build_extensions_debug.sh
Executable file
12
devops/build_extensions_debug.sh
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
for f in tap/extensions/*; do
|
||||
if [ -d "$f" ]; then
|
||||
extension=$(basename $f) && \
|
||||
cd tap/extensions/${extension} && \
|
||||
go build -gcflags="all=-N -l" -buildmode=plugin -o ../${extension}.so . && \
|
||||
cd ../../.. && \
|
||||
mkdir -p agent/build/extensions && \
|
||||
cp tap/extensions/${extension}.so agent/build/extensions
|
||||
fi
|
||||
done
|
||||
@@ -6,10 +6,10 @@ const (
|
||||
HostModeEnvVar = "HOST_MODE"
|
||||
NodeNameEnvVar = "NODE_NAME"
|
||||
TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
|
||||
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
|
||||
RulePolicyPath = "/app/enforce-policy/"
|
||||
RulePolicyFileName = "enforce-policy.yaml"
|
||||
ConfigDirPath = "/app/config/"
|
||||
ValidationRulesFileName = "validation-rules.yaml"
|
||||
ContractFileName = "contract-oas.yaml"
|
||||
ConfigFileName = "mizu-config.json"
|
||||
GoGCEnvVar = "GOGC"
|
||||
DefaultApiServerPort = 8899
|
||||
DebugModeEnvVar = "MIZU_DEBUG"
|
||||
|
||||
@@ -18,6 +18,11 @@ const (
|
||||
WebsocketMessageTypeOutboundLink WebSocketMessageType = "outboundLink"
|
||||
)
|
||||
|
||||
type MizuAgentConfig struct {
|
||||
TapTargetRegex SerializableRegexp `yaml:"tapTargetRegex"`
|
||||
MaxDBSizeBytes int64 `yaml:"maxDBSizeBytes"`
|
||||
}
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
MessageType WebSocketMessageType `json:"messageType,omitempty"`
|
||||
}
|
||||
|
||||
30
shared/serializable_regexp.go
Normal file
30
shared/serializable_regexp.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package shared
|
||||
|
||||
import "regexp"
|
||||
|
||||
type SerializableRegexp struct {
|
||||
regexp.Regexp
|
||||
}
|
||||
|
||||
func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) {
|
||||
re, err := regexp.Compile(expr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SerializableRegexp{*re}, nil
|
||||
}
|
||||
|
||||
// UnmarshalText is by json.Unmarshal.
|
||||
func (r *SerializableRegexp) UnmarshalText(text []byte) error {
|
||||
rr, err := CompileRegexToSerializableRegexp(string(text))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*r = *rr
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarshalText is used by json.Marshal.
|
||||
func (r *SerializableRegexp) MarshalText() ([]byte, error) {
|
||||
return []byte(r.String()), nil
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func (cl *Cleaner) start() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(cl.cleanPeriod)
|
||||
|
||||
for true {
|
||||
for {
|
||||
<-ticker.C
|
||||
cl.clean()
|
||||
}
|
||||
|
||||
76
tap/diagnose/diagnose.go
Normal file
76
tap/diagnose/diagnose.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package diagnose
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var AppStats = api.AppStats{}
|
||||
|
||||
func StartMemoryProfiler(envDumpPath string, envTimeInterval string) {
|
||||
dumpPath := "/app/pprof"
|
||||
if envDumpPath != "" {
|
||||
dumpPath = envDumpPath
|
||||
}
|
||||
timeInterval := 60
|
||||
if envTimeInterval != "" {
|
||||
if i, err := strconv.Atoi(envTimeInterval); err == nil {
|
||||
timeInterval = i
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
logger.Log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
t := time.Now()
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
logger.Log.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
logger.Log.Fatal("could not create memory profile: ", err)
|
||||
}
|
||||
runtime.GC() // get up-to-date statistics
|
||||
if err := pprof.WriteHeapProfile(f); err != nil {
|
||||
logger.Log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func DumpMemoryProfile(filename string) error {
|
||||
if filename == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
f, err := os.Create(filename)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
|
||||
if err := pprof.WriteHeapProfile(f); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
85
tap/diagnose/errors_map.go
Normal file
85
tap/diagnose/errors_map.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package diagnose
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/google/gopacket/examples/util"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var TapErrors *errorsMap
|
||||
|
||||
type errorsMap struct {
|
||||
errorsMap map[string]uint
|
||||
OutputLevel int
|
||||
ErrorsCount uint
|
||||
errorsMapMutex sync.Mutex
|
||||
}
|
||||
|
||||
func InitializeErrorsMap(debug bool, verbose bool, quiet bool) {
|
||||
var outputLevel int
|
||||
|
||||
defer util.Run()()
|
||||
if debug {
|
||||
outputLevel = 2
|
||||
} else if verbose {
|
||||
outputLevel = 1
|
||||
} else if quiet {
|
||||
outputLevel = -1
|
||||
}
|
||||
|
||||
TapErrors = newErrorsMap(outputLevel)
|
||||
}
|
||||
|
||||
func newErrorsMap(outputLevel int) *errorsMap {
|
||||
return &errorsMap{
|
||||
errorsMap: make(map[string]uint),
|
||||
OutputLevel: outputLevel,
|
||||
}
|
||||
}
|
||||
|
||||
/* minOutputLevel: Error will be printed only if outputLevel is above this value
|
||||
* t: key for errorsMap (counting errors)
|
||||
* s, a: arguments logger.Log.Infof
|
||||
* Note: Too bad for perf that a... is evaluated
|
||||
*/
|
||||
func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) {
|
||||
e.errorsMapMutex.Lock()
|
||||
e.ErrorsCount++
|
||||
nb := e.errorsMap[t]
|
||||
e.errorsMap[t] = nb + 1
|
||||
e.errorsMapMutex.Unlock()
|
||||
|
||||
if e.OutputLevel >= minOutputLevel {
|
||||
formatStr := fmt.Sprintf("%s: %s", t, s)
|
||||
logger.Log.Errorf(formatStr, a...)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *errorsMap) Error(t string, s string, a ...interface{}) {
|
||||
e.logError(0, t, s, a...)
|
||||
}
|
||||
|
||||
func (e *errorsMap) SilentError(t string, s string, a ...interface{}) {
|
||||
e.logError(2, t, s, a...)
|
||||
}
|
||||
|
||||
func (e *errorsMap) Debug(s string, a ...interface{}) {
|
||||
logger.Log.Debugf(s, a...)
|
||||
}
|
||||
|
||||
func (e *errorsMap) GetErrorsSummary() (int, string) {
|
||||
e.errorsMapMutex.Lock()
|
||||
errorMapLen := len(e.errorsMap)
|
||||
errorsSummery := fmt.Sprintf("%v", e.errorsMap)
|
||||
e.errorsMapMutex.Unlock()
|
||||
return errorMapLen, errorsSummery
|
||||
}
|
||||
|
||||
func (e *errorsMap) PrintSummary() {
|
||||
logger.Log.Infof("Errors: %d", e.ErrorsCount)
|
||||
for t := range e.errorsMap {
|
||||
logger.Log.Infof(" %s:\t\t%d", e, e.errorsMap[t])
|
||||
}
|
||||
}
|
||||
46
tap/diagnose/internal_stats.go
Normal file
46
tap/diagnose/internal_stats.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package diagnose
|
||||
|
||||
import "github.com/up9inc/mizu/shared/logger"
|
||||
|
||||
type tapperInternalStats struct {
|
||||
Ipdefrag int
|
||||
MissedBytes int
|
||||
Pkt int
|
||||
Sz int
|
||||
Totalsz int
|
||||
RejectFsm int
|
||||
RejectOpt int
|
||||
RejectConnFsm int
|
||||
Reassembled int
|
||||
OutOfOrderBytes int
|
||||
OutOfOrderPackets int
|
||||
BiggestChunkBytes int
|
||||
BiggestChunkPackets int
|
||||
OverlapBytes int
|
||||
OverlapPackets int
|
||||
}
|
||||
|
||||
var InternalStats *tapperInternalStats
|
||||
|
||||
func InitializeTapperInternalStats() {
|
||||
InternalStats = &tapperInternalStats{}
|
||||
}
|
||||
|
||||
func (stats *tapperInternalStats) PrintStatsSummary() {
|
||||
logger.Log.Infof("IPdefrag:\t\t%d", stats.Ipdefrag)
|
||||
logger.Log.Infof("TCP stats:")
|
||||
logger.Log.Infof(" missed bytes:\t\t%d", stats.MissedBytes)
|
||||
logger.Log.Infof(" total packets:\t\t%d", stats.Pkt)
|
||||
logger.Log.Infof(" rejected FSM:\t\t%d", stats.RejectFsm)
|
||||
logger.Log.Infof(" rejected Options:\t%d", stats.RejectOpt)
|
||||
logger.Log.Infof(" reassembled bytes:\t%d", stats.Sz)
|
||||
logger.Log.Infof(" total TCP bytes:\t%d", stats.Totalsz)
|
||||
logger.Log.Infof(" conn rejected FSM:\t%d", stats.RejectConnFsm)
|
||||
logger.Log.Infof(" reassembled chunks:\t%d", stats.Reassembled)
|
||||
logger.Log.Infof(" out-of-order packets:\t%d", stats.OutOfOrderPackets)
|
||||
logger.Log.Infof(" out-of-order bytes:\t%d", stats.OutOfOrderBytes)
|
||||
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.BiggestChunkPackets)
|
||||
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.BiggestChunkBytes)
|
||||
logger.Log.Infof(" overlap packets:\t%d", stats.OverlapPackets)
|
||||
logger.Log.Infof(" overlap bytes:\t\t%d", stats.OverlapBytes)
|
||||
}
|
||||
@@ -4,7 +4,9 @@ go 1.16
|
||||
|
||||
require (
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
|
||||
github.com/go-errors/errors v1.4.1
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
|
||||
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg=
|
||||
github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
|
||||
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
|
||||
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
|
||||
|
||||
@@ -3,6 +3,8 @@ package tap
|
||||
import (
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
)
|
||||
|
||||
var privateIPBlocks []*net.IPNet
|
||||
@@ -27,6 +29,7 @@ func getLocalhostIPs() ([]string, error) {
|
||||
return myIPs, nil
|
||||
}
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
func isPrivateIP(ipStr string) bool {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
|
||||
@@ -54,7 +57,7 @@ func initPrivateIPBlocks() {
|
||||
} {
|
||||
_, block, err := net.ParseCIDR(cidr)
|
||||
if err != nil {
|
||||
Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
|
||||
diagnose.TapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
|
||||
} else {
|
||||
privateIPBlocks = append(privateIPBlocks, block)
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ const (
|
||||
)
|
||||
|
||||
type OutboundLink struct {
|
||||
Src string
|
||||
DstIP string
|
||||
DstPort int
|
||||
Src string
|
||||
DstIP string
|
||||
DstPort int
|
||||
SuggestedResolvedName string
|
||||
SuggestedProtocol OutboundLinkProtocol
|
||||
SuggestedProtocol OutboundLinkProtocol
|
||||
}
|
||||
|
||||
func NewOutboundLinkWriter() *OutboundLinkWriter {
|
||||
@@ -26,11 +26,11 @@ type OutboundLinkWriter struct {
|
||||
|
||||
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
|
||||
olw.OutChan <- &OutboundLink{
|
||||
Src: src,
|
||||
DstIP: DstIP,
|
||||
DstPort: DstPort,
|
||||
Src: src,
|
||||
DstIP: DstIP,
|
||||
DstPort: DstPort,
|
||||
SuggestedResolvedName: SuggestedResolvedName,
|
||||
SuggestedProtocol: SuggestedProtocol,
|
||||
SuggestedProtocol: SuggestedProtocol,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,33 +9,22 @@
|
||||
package tap
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
_debug "runtime/debug"
|
||||
"runtime/pprof"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/examples/util"
|
||||
"github.com/google/gopacket/ip4defrag"
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
"github.com/up9inc/mizu/tap/source"
|
||||
)
|
||||
|
||||
const cleanPeriod = time.Second * 10
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
var remoteOnlyOutboundPorts = []int{80, 443}
|
||||
|
||||
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
||||
@@ -62,69 +51,14 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
|
||||
|
||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||
|
||||
var appStats = api.AppStats{}
|
||||
|
||||
// global
|
||||
var stats struct {
|
||||
ipdefrag int
|
||||
missedBytes int
|
||||
pkt int
|
||||
sz int
|
||||
totalsz int
|
||||
rejectFsm int
|
||||
rejectOpt int
|
||||
rejectConnFsm int
|
||||
reassembled int
|
||||
outOfOrderBytes int
|
||||
outOfOrderPackets int
|
||||
biggestChunkBytes int
|
||||
biggestChunkPackets int
|
||||
overlapBytes int
|
||||
overlapPackets int
|
||||
}
|
||||
|
||||
type TapOpts struct {
|
||||
HostMode bool
|
||||
}
|
||||
|
||||
var outputLevel int
|
||||
var errorsMap map[string]uint
|
||||
var errorsMapMutex sync.Mutex
|
||||
var nErrors uint
|
||||
var ownIps []string // global
|
||||
var hostMode bool // global
|
||||
var extensions []*api.Extension // global
|
||||
var filteringOptions *api.TrafficFilteringOptions // global
|
||||
|
||||
const baseStreamChannelTimeoutMs int = 5000 * 100
|
||||
|
||||
/* minOutputLevel: Error will be printed only if outputLevel is above this value
|
||||
* t: key for errorsMap (counting errors)
|
||||
* s, a: arguments logger.Log.Infof
|
||||
* Note: Too bad for perf that a... is evaluated
|
||||
*/
|
||||
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
|
||||
errorsMapMutex.Lock()
|
||||
nErrors++
|
||||
nb, _ := errorsMap[t]
|
||||
errorsMap[t] = nb + 1
|
||||
errorsMapMutex.Unlock()
|
||||
|
||||
if outputLevel >= minOutputLevel {
|
||||
formatStr := fmt.Sprintf("%s: %s", t, s)
|
||||
logger.Log.Errorf(formatStr, a...)
|
||||
}
|
||||
}
|
||||
func Error(t string, s string, a ...interface{}) {
|
||||
logError(0, t, s, a...)
|
||||
}
|
||||
func SilentError(t string, s string, a ...interface{}) {
|
||||
logError(2, t, s, a...)
|
||||
}
|
||||
func Debug(s string, a ...interface{}) {
|
||||
logger.Log.Debugf(s, a...)
|
||||
}
|
||||
|
||||
func inArrayInt(arr []int, valueToCheck int) bool {
|
||||
for _, value := range arr {
|
||||
if value == valueToCheck {
|
||||
@@ -143,397 +77,121 @@ func inArrayString(arr []string, valueToCheck string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Context
|
||||
// The assembler context
|
||||
type Context struct {
|
||||
CaptureInfo gopacket.CaptureInfo
|
||||
}
|
||||
|
||||
func GetStats() api.AppStats {
|
||||
return appStats
|
||||
}
|
||||
|
||||
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
||||
hostMode = opts.HostMode
|
||||
extensions = extensionsRef
|
||||
filteringOptions = options
|
||||
|
||||
if GetMemoryProfilingEnabled() {
|
||||
startMemoryProfiler()
|
||||
diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds))
|
||||
}
|
||||
|
||||
go startPassiveTapper(outputItems)
|
||||
}
|
||||
|
||||
func startMemoryProfiler() {
|
||||
dumpPath := "/app/pprof"
|
||||
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
|
||||
if envDumpPath != "" {
|
||||
dumpPath = envDumpPath
|
||||
}
|
||||
timeInterval := 60
|
||||
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
|
||||
if envTimeInterval != "" {
|
||||
if i, err := strconv.Atoi(envTimeInterval); err == nil {
|
||||
timeInterval = i
|
||||
}
|
||||
}
|
||||
func printPeriodicStats(cleaner *Cleaner) {
|
||||
statsPeriod := time.Second * time.Duration(*statsevery)
|
||||
ticker := time.NewTicker(statsPeriod)
|
||||
|
||||
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
logger.Log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
for true {
|
||||
t := time.Now()
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
logger.Log.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
logger.Log.Fatal("could not create memory profile: ", err)
|
||||
}
|
||||
runtime.GC() // get up-to-date statistics
|
||||
if err := pprof.WriteHeapProfile(f); err != nil {
|
||||
logger.Log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func closeTimedoutTcpStreamChannels() {
|
||||
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
_debug.FreeOSMemory()
|
||||
streams.Range(func(key interface{}, value interface{}) bool {
|
||||
streamWrapper := value.(*tcpStreamWrapper)
|
||||
stream := streamWrapper.stream
|
||||
if stream.superIdentifier.Protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||
stream.Close()
|
||||
appStats.IncDroppedTcpStreams()
|
||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
for i := range stream.clients {
|
||||
reader := &stream.clients[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
for i := range stream.servers {
|
||||
reader := &stream.servers[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
stream.superIdentifier.IsClosedOthers = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
<-ticker.C
|
||||
|
||||
// Since the start
|
||||
errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary()
|
||||
|
||||
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
time.Since(diagnose.AppStats.StartTime),
|
||||
diagnose.TapErrors.ErrorsCount,
|
||||
errorMapLen,
|
||||
errorsSummery,
|
||||
)
|
||||
|
||||
// At this moment
|
||||
memStats := runtime.MemStats{}
|
||||
runtime.ReadMemStats(&memStats)
|
||||
logger.Log.Infof(
|
||||
"mem: %d, goroutines: %d",
|
||||
memStats.HeapAlloc,
|
||||
runtime.NumGoroutine(),
|
||||
)
|
||||
|
||||
// Since the last print
|
||||
cleanStats := cleaner.dumpStats()
|
||||
logger.Log.Infof(
|
||||
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
|
||||
cleanStats.flushed,
|
||||
cleanStats.closed,
|
||||
cleanStats.deleted,
|
||||
)
|
||||
currentAppStats := diagnose.AppStats.DumpStats()
|
||||
appStatsJSON, _ := json.Marshal(currentAppStats)
|
||||
logger.Log.Infof("app stats - %v", string(appStatsJSON))
|
||||
}
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
go closeTimedoutTcpStreamChannels()
|
||||
streamsMap := NewTcpStreamMap()
|
||||
go streamsMap.closeTimedoutTcpStreamChannels()
|
||||
|
||||
defer util.Run()()
|
||||
if *debug {
|
||||
outputLevel = 2
|
||||
} else if *verbose {
|
||||
outputLevel = 1
|
||||
} else if *quiet {
|
||||
outputLevel = -1
|
||||
}
|
||||
errorsMap = make(map[string]uint)
|
||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||
diagnose.InitializeTapperInternalStats()
|
||||
|
||||
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
||||
// TODO: think this over
|
||||
logger.Log.Info("Failed to get self IP addresses")
|
||||
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
||||
ownIps = make([]string, 0)
|
||||
} else {
|
||||
ownIps = localhostIPs
|
||||
}
|
||||
|
||||
var handle *pcap.Handle
|
||||
var err error
|
||||
if *fname != "" {
|
||||
if handle, err = pcap.OpenOffline(*fname); err != nil {
|
||||
logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
|
||||
}
|
||||
} else {
|
||||
// This is a little complicated because we want to allow all possible options
|
||||
// for creating the packet capture handle... instead of all this you can
|
||||
// just call pcap.OpenLive if you want a simple handle.
|
||||
inactive, err := pcap.NewInactiveHandle(*iface)
|
||||
if err != nil {
|
||||
logger.Log.Fatalf("could not create: %v", err)
|
||||
}
|
||||
defer inactive.CleanUp()
|
||||
if err = inactive.SetSnapLen(*snaplen); err != nil {
|
||||
logger.Log.Fatalf("could not set snap length: %v", err)
|
||||
} else if err = inactive.SetPromisc(*promisc); err != nil {
|
||||
logger.Log.Fatalf("could not set promisc mode: %v", err)
|
||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
||||
logger.Log.Fatalf("could not set timeout: %v", err)
|
||||
}
|
||||
if *tstype != "" {
|
||||
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
|
||||
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
} else if err := inactive.SetTimestampSource(t); err != nil {
|
||||
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
}
|
||||
}
|
||||
if handle, err = inactive.Activate(); err != nil {
|
||||
logger.Log.Fatalf("PCAP Activate error: %v", err)
|
||||
}
|
||||
defer handle.Close()
|
||||
}
|
||||
var bpffilter string
|
||||
if len(flag.Args()) > 0 {
|
||||
bpffilter := strings.Join(flag.Args(), " ")
|
||||
logger.Log.Infof("Using BPF filter %q", bpffilter)
|
||||
if err = handle.SetBPFFilter(bpffilter); err != nil {
|
||||
logger.Log.Fatalf("BPF filter error: %v", err)
|
||||
}
|
||||
bpffilter = strings.Join(flag.Args(), " ")
|
||||
}
|
||||
|
||||
var dec gopacket.Decoder
|
||||
var ok bool
|
||||
decoderName := *decoder
|
||||
if decoderName == "" {
|
||||
decoderName = fmt.Sprintf("%s", handle.LinkType())
|
||||
packetSource, err := source.NewTcpPacketSource(*fname, *iface, source.TcpPacketSourceBehaviour{
|
||||
SnapLength: *snaplen,
|
||||
Promisc: *promisc,
|
||||
Tstype: *tstype,
|
||||
DecoderName: *decoder,
|
||||
Lazy: *lazy,
|
||||
BpfFilter: bpffilter,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Fatal(err)
|
||||
}
|
||||
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
|
||||
logger.Log.Fatal("No decoder named", decoderName)
|
||||
|
||||
defer packetSource.Close()
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Fatal(err)
|
||||
}
|
||||
source := gopacket.NewPacketSource(handle, dec)
|
||||
source.Lazy = *lazy
|
||||
source.NoCopy = true
|
||||
|
||||
packets := make(chan source.TcpPacketInfo)
|
||||
assembler := NewTcpAssembler(outputItems, streamsMap)
|
||||
|
||||
logger.Log.Info("Starting to read packets")
|
||||
appStats.SetStartTime(time.Now())
|
||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||
diagnose.AppStats.SetStartTime(time.Now())
|
||||
|
||||
var emitter api.Emitter = &api.Emitting{
|
||||
AppStats: &appStats,
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
streamFactory := &tcpStreamFactory{
|
||||
Emitter: emitter,
|
||||
}
|
||||
streamPool := reassembly.NewStreamPool(streamFactory)
|
||||
assembler := reassembly.NewAssembler(streamPool)
|
||||
|
||||
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
|
||||
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
|
||||
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
|
||||
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
|
||||
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
|
||||
|
||||
var assemblerMutex sync.Mutex
|
||||
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, os.Interrupt)
|
||||
go packetSource.ReadPackets(!*nodefrag, packets)
|
||||
|
||||
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
|
||||
cleaner := Cleaner{
|
||||
assembler: assembler,
|
||||
assemblerMutex: &assemblerMutex,
|
||||
assembler: assembler.Assembler,
|
||||
assemblerMutex: &assembler.assemblerMutex,
|
||||
cleanPeriod: cleanPeriod,
|
||||
connectionTimeout: staleConnectionTimeout,
|
||||
}
|
||||
cleaner.start()
|
||||
|
||||
go func() {
|
||||
statsPeriod := time.Second * time.Duration(*statsevery)
|
||||
ticker := time.NewTicker(statsPeriod)
|
||||
go printPeriodicStats(&cleaner)
|
||||
|
||||
for true {
|
||||
<-ticker.C
|
||||
assembler.processPackets(*hexdumppkt, packets)
|
||||
|
||||
// Since the start
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
time.Since(appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen,
|
||||
errorsSummery,
|
||||
)
|
||||
|
||||
// At this moment
|
||||
memStats := runtime.MemStats{}
|
||||
runtime.ReadMemStats(&memStats)
|
||||
logger.Log.Infof(
|
||||
"mem: %d, goroutines: %d",
|
||||
memStats.HeapAlloc,
|
||||
runtime.NumGoroutine(),
|
||||
)
|
||||
|
||||
// Since the last print
|
||||
cleanStats := cleaner.dumpStats()
|
||||
logger.Log.Infof(
|
||||
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
|
||||
cleanStats.flushed,
|
||||
cleanStats.closed,
|
||||
cleanStats.deleted,
|
||||
)
|
||||
currentAppStats := appStats.DumpStats()
|
||||
appStatsJSON, _ := json.Marshal(currentAppStats)
|
||||
logger.Log.Infof("app stats - %v", string(appStatsJSON))
|
||||
}
|
||||
}()
|
||||
|
||||
if GetMemoryProfilingEnabled() {
|
||||
startMemoryProfiler()
|
||||
if diagnose.TapErrors.OutputLevel >= 2 {
|
||||
assembler.dumpStreamPool()
|
||||
}
|
||||
|
||||
for {
|
||||
packet, err := source.NextPacket()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
logger.Log.Debugf("Error: %v", err)
|
||||
continue
|
||||
}
|
||||
packetsCount := appStats.IncPacketsCount()
|
||||
logger.Log.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
appStats.UpdateProcessedBytes(uint64(len(data)))
|
||||
if *hexdumppkt {
|
||||
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
|
||||
// defrag the IPv4 packet if required
|
||||
if !*nodefrag {
|
||||
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
|
||||
if ip4Layer == nil {
|
||||
continue
|
||||
}
|
||||
ip4 := ip4Layer.(*layers.IPv4)
|
||||
l := ip4.Length
|
||||
newip4, err := defragger.DefragIPv4(ip4)
|
||||
if err != nil {
|
||||
logger.Log.Fatal("Error while de-fragmenting", err)
|
||||
} else if newip4 == nil {
|
||||
logger.Log.Debugf("Fragment...")
|
||||
continue // packet fragment, we don't have whole packet yet.
|
||||
}
|
||||
if newip4.Length != l {
|
||||
stats.ipdefrag++
|
||||
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
|
||||
pb, ok := packet.(gopacket.PacketBuilder)
|
||||
if !ok {
|
||||
logger.Log.Panic("Not a PacketBuilder")
|
||||
}
|
||||
nextDecoder := newip4.NextLayerType()
|
||||
_ = nextDecoder.Decode(newip4.Payload, pb)
|
||||
}
|
||||
}
|
||||
|
||||
tcp := packet.Layer(layers.LayerTypeTCP)
|
||||
if tcp != nil {
|
||||
appStats.IncTcpPacketsCount()
|
||||
tcp := tcp.(*layers.TCP)
|
||||
if *checksum {
|
||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||
if err != nil {
|
||||
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
|
||||
}
|
||||
}
|
||||
c := Context{
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
}
|
||||
stats.totalsz += len(tcp.Payload)
|
||||
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
assemblerMutex.Lock()
|
||||
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
|
||||
if done {
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
appStats.PacketsCount,
|
||||
appStats.ProcessedBytes,
|
||||
time.Since(appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen)
|
||||
}
|
||||
select {
|
||||
case <-signalChan:
|
||||
logger.Log.Infof("Caught SIGINT: aborting")
|
||||
done = true
|
||||
default:
|
||||
// NOP: continue
|
||||
}
|
||||
if done {
|
||||
break
|
||||
}
|
||||
if err := diagnose.DumpMemoryProfile(*memprofile); err != nil {
|
||||
logger.Log.Errorf("Error dumping memory profile %v\n", err)
|
||||
}
|
||||
|
||||
assemblerMutex.Lock()
|
||||
closed := assembler.FlushAll()
|
||||
assemblerMutex.Unlock()
|
||||
logger.Log.Debugf("Final flush: %d closed", closed)
|
||||
if outputLevel >= 2 {
|
||||
streamPool.Dump()
|
||||
}
|
||||
assembler.waitAndDump()
|
||||
|
||||
if *memprofile != "" {
|
||||
f, err := os.Create(*memprofile)
|
||||
if err != nil {
|
||||
logger.Log.Fatal(err)
|
||||
}
|
||||
_ = pprof.WriteHeapProfile(f)
|
||||
_ = f.Close()
|
||||
}
|
||||
|
||||
streamFactory.WaitGoRoutines()
|
||||
assemblerMutex.Lock()
|
||||
logger.Log.Debugf("%s", assembler.Dump())
|
||||
assemblerMutex.Unlock()
|
||||
if !*nodefrag {
|
||||
logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag)
|
||||
}
|
||||
logger.Log.Infof("TCP stats:")
|
||||
logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes)
|
||||
logger.Log.Infof(" total packets:\t\t%d", stats.pkt)
|
||||
logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm)
|
||||
logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt)
|
||||
logger.Log.Infof(" reassembled bytes:\t%d", stats.sz)
|
||||
logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz)
|
||||
logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm)
|
||||
logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled)
|
||||
logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets)
|
||||
logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
|
||||
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
|
||||
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
|
||||
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
|
||||
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
|
||||
logger.Log.Infof("Errors: %d", nErrors)
|
||||
for e := range errorsMap {
|
||||
logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e])
|
||||
}
|
||||
logger.Log.Infof("AppStats: %v", GetStats())
|
||||
diagnose.InternalStats.PrintStatsSummary()
|
||||
diagnose.TapErrors.PrintSummary()
|
||||
logger.Log.Infof("AppStats: %v", diagnose.AppStats)
|
||||
}
|
||||
|
||||
150
tap/source/tcp_packet_source.go
Normal file
150
tap/source/tcp_packet_source.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/ip4defrag"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
)
|
||||
|
||||
type TcpPacketSource struct {
|
||||
source *gopacket.PacketSource
|
||||
handle *pcap.Handle
|
||||
defragger *ip4defrag.IPv4Defragmenter
|
||||
Behaviour *TcpPacketSourceBehaviour
|
||||
}
|
||||
|
||||
type TcpPacketSourceBehaviour struct {
|
||||
SnapLength int
|
||||
Promisc bool
|
||||
Tstype string
|
||||
DecoderName string
|
||||
Lazy bool
|
||||
BpfFilter string
|
||||
}
|
||||
|
||||
type TcpPacketInfo struct {
|
||||
Packet gopacket.Packet
|
||||
Source *TcpPacketSource
|
||||
}
|
||||
|
||||
func NewTcpPacketSource(filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*TcpPacketSource, error) {
|
||||
var err error
|
||||
|
||||
result := &TcpPacketSource{
|
||||
defragger: ip4defrag.NewIPv4Defragmenter(),
|
||||
Behaviour: &behaviour,
|
||||
}
|
||||
|
||||
if filename != "" {
|
||||
if result.handle, err = pcap.OpenOffline(filename); err != nil {
|
||||
return result, fmt.Errorf("PCAP OpenOffline error: %v", err)
|
||||
}
|
||||
} else {
|
||||
// This is a little complicated because we want to allow all possible options
|
||||
// for creating the packet capture handle... instead of all this you can
|
||||
// just call pcap.OpenLive if you want a simple handle.
|
||||
inactive, err := pcap.NewInactiveHandle(interfaceName)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("could not create: %v", err)
|
||||
}
|
||||
defer inactive.CleanUp()
|
||||
if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil {
|
||||
return result, fmt.Errorf("could not set snap length: %v", err)
|
||||
} else if err = inactive.SetPromisc(behaviour.Promisc); err != nil {
|
||||
return result, fmt.Errorf("could not set promisc mode: %v", err)
|
||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
||||
return result, fmt.Errorf("could not set timeout: %v", err)
|
||||
}
|
||||
if behaviour.Tstype != "" {
|
||||
if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil {
|
||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
} else if err := inactive.SetTimestampSource(t); err != nil {
|
||||
return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
}
|
||||
}
|
||||
if result.handle, err = inactive.Activate(); err != nil {
|
||||
return result, fmt.Errorf("PCAP Activate error: %v", err)
|
||||
}
|
||||
}
|
||||
if behaviour.BpfFilter != "" {
|
||||
logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter)
|
||||
if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil {
|
||||
return nil, fmt.Errorf("BPF filter error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var dec gopacket.Decoder
|
||||
var ok bool
|
||||
if behaviour.DecoderName == "" {
|
||||
behaviour.DecoderName = result.handle.LinkType().String()
|
||||
}
|
||||
if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok {
|
||||
return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName)
|
||||
}
|
||||
result.source = gopacket.NewPacketSource(result.handle, dec)
|
||||
result.source.Lazy = behaviour.Lazy
|
||||
result.source.NoCopy = true
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (source *TcpPacketSource) Close() {
|
||||
if source.handle != nil {
|
||||
source.handle.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (source *TcpPacketSource) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) error {
|
||||
for {
|
||||
packet, err := source.source.NextPacket()
|
||||
|
||||
if err == io.EOF {
|
||||
return err
|
||||
} else if err != nil {
|
||||
if err.Error() != "Timeout Expired" {
|
||||
logger.Log.Debugf("Error: %T", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// defrag the IPv4 packet if required
|
||||
if !ipdefrag {
|
||||
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
|
||||
if ip4Layer == nil {
|
||||
continue
|
||||
}
|
||||
ip4 := ip4Layer.(*layers.IPv4)
|
||||
l := ip4.Length
|
||||
newip4, err := source.defragger.DefragIPv4(ip4)
|
||||
if err != nil {
|
||||
logger.Log.Fatal("Error while de-fragmenting", err)
|
||||
} else if newip4 == nil {
|
||||
logger.Log.Debugf("Fragment...")
|
||||
continue // packet fragment, we don't have whole packet yet.
|
||||
}
|
||||
if newip4.Length != l {
|
||||
diagnose.InternalStats.Ipdefrag++
|
||||
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
|
||||
pb, ok := packet.(gopacket.PacketBuilder)
|
||||
if !ok {
|
||||
logger.Log.Panic("Not a PacketBuilder")
|
||||
}
|
||||
nextDecoder := newip4.NextLayerType()
|
||||
_ = nextDecoder.Decode(newip4.Payload, pb)
|
||||
}
|
||||
}
|
||||
|
||||
packets <- TcpPacketInfo{
|
||||
Packet: packet,
|
||||
Source: source,
|
||||
}
|
||||
}
|
||||
}
|
||||
132
tap/tcp_assembler.go
Normal file
132
tap/tcp_assembler.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package tap
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
"github.com/up9inc/mizu/tap/source"
|
||||
)
|
||||
|
||||
type tcpAssembler struct {
|
||||
*reassembly.Assembler
|
||||
streamPool *reassembly.StreamPool
|
||||
streamFactory *tcpStreamFactory
|
||||
assemblerMutex sync.Mutex
|
||||
}
|
||||
|
||||
// Context
|
||||
// The assembler context
|
||||
type context struct {
|
||||
CaptureInfo gopacket.CaptureInfo
|
||||
}
|
||||
|
||||
func (c *context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
|
||||
func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler {
|
||||
var emitter api.Emitter = &api.Emitting{
|
||||
AppStats: &diagnose.AppStats,
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
streamFactory := NewTcpStreamFactory(emitter, streamsMap)
|
||||
streamPool := reassembly.NewStreamPool(streamFactory)
|
||||
assembler := reassembly.NewAssembler(streamPool)
|
||||
|
||||
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
|
||||
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
|
||||
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d",
|
||||
maxBufferedPagesTotal, maxBufferedPagesPerConnection)
|
||||
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
|
||||
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
|
||||
|
||||
return &tcpAssembler{
|
||||
Assembler: assembler,
|
||||
streamPool: streamPool,
|
||||
streamFactory: streamFactory,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) {
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
signal.Notify(signalChan, os.Interrupt)
|
||||
|
||||
for packetInfo := range packets {
|
||||
packetsCount := diagnose.AppStats.IncPacketsCount()
|
||||
logger.Log.Debugf("PACKET #%d", packetsCount)
|
||||
packet := packetInfo.Packet
|
||||
data := packet.Data()
|
||||
diagnose.AppStats.UpdateProcessedBytes(uint64(len(data)))
|
||||
if dumpPacket {
|
||||
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
|
||||
tcp := packet.Layer(layers.LayerTypeTCP)
|
||||
if tcp != nil {
|
||||
diagnose.AppStats.IncTcpPacketsCount()
|
||||
tcp := tcp.(*layers.TCP)
|
||||
if *checksum {
|
||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||
if err != nil {
|
||||
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
|
||||
}
|
||||
}
|
||||
c := context{
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
}
|
||||
diagnose.InternalStats.Totalsz += len(tcp.Payload)
|
||||
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
a.assemblerMutex.Lock()
|
||||
a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
a.assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount
|
||||
if done {
|
||||
errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary()
|
||||
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
diagnose.AppStats.PacketsCount,
|
||||
diagnose.AppStats.ProcessedBytes,
|
||||
time.Since(diagnose.AppStats.StartTime),
|
||||
diagnose.TapErrors.ErrorsCount,
|
||||
errorMapLen)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-signalChan:
|
||||
logger.Log.Infof("Caught SIGINT: aborting")
|
||||
done = true
|
||||
default:
|
||||
// NOP: continue
|
||||
}
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
a.assemblerMutex.Lock()
|
||||
closed := a.FlushAll()
|
||||
a.assemblerMutex.Unlock()
|
||||
logger.Log.Debugf("Final flush: %d closed", closed)
|
||||
}
|
||||
|
||||
func (a *tcpAssembler) dumpStreamPool() {
|
||||
a.streamPool.Dump()
|
||||
}
|
||||
|
||||
func (a *tcpAssembler) waitAndDump() {
|
||||
a.streamFactory.WaitGoRoutines()
|
||||
a.assemblerMutex.Lock()
|
||||
logger.Log.Debugf("%s", a.Dump())
|
||||
a.assemblerMutex.Unlock()
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package tap
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
@@ -20,13 +19,6 @@ type tcpReaderDataMsg struct {
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type tcpID struct {
|
||||
srcIP string
|
||||
dstIP string
|
||||
srcPort string
|
||||
dstPort string
|
||||
}
|
||||
|
||||
type ConnectionInfo struct {
|
||||
ClientIP string
|
||||
ClientPort string
|
||||
@@ -35,10 +27,6 @@ type ConnectionInfo struct {
|
||||
IsOutgoing bool
|
||||
}
|
||||
|
||||
func (tid *tcpID) String() string {
|
||||
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
|
||||
}
|
||||
|
||||
/* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
|
||||
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
|
||||
* An tcpReader object is unidirectional: it parses either a client stream or a server stream.
|
||||
@@ -54,7 +42,6 @@ type tcpReader struct {
|
||||
data []byte
|
||||
superTimer *api.SuperTimer
|
||||
parent *tcpStream
|
||||
messageCount uint
|
||||
packetsSeen uint
|
||||
outboundLinkWriter *OutboundLinkWriter
|
||||
extension *api.Extension
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
)
|
||||
|
||||
/* It's a connection (bidirectional)
|
||||
@@ -28,19 +29,19 @@ type tcpStream struct {
|
||||
isTapTarget bool
|
||||
clients []tcpReader
|
||||
servers []tcpReader
|
||||
urls []string
|
||||
ident string
|
||||
sync.Mutex
|
||||
streamsMap *tcpStreamMap
|
||||
}
|
||||
|
||||
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
|
||||
// FSM
|
||||
if !t.tcpstate.CheckState(tcp, dir) {
|
||||
SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
|
||||
stats.rejectFsm++
|
||||
diagnose.TapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
|
||||
diagnose.InternalStats.RejectFsm++
|
||||
if !t.fsmerr {
|
||||
t.fsmerr = true
|
||||
stats.rejectConnFsm++
|
||||
diagnose.InternalStats.RejectConnFsm++
|
||||
}
|
||||
if !*ignorefsmerr {
|
||||
return false
|
||||
@@ -49,8 +50,8 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
|
||||
// Options
|
||||
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
|
||||
if err != nil {
|
||||
SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
|
||||
stats.rejectOpt++
|
||||
diagnose.TapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
|
||||
diagnose.InternalStats.RejectOpt++
|
||||
if !*nooptcheck {
|
||||
return false
|
||||
}
|
||||
@@ -60,15 +61,15 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
|
||||
if *checksum {
|
||||
c, err := tcp.ComputeChecksum()
|
||||
if err != nil {
|
||||
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
|
||||
diagnose.TapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
|
||||
accept = false
|
||||
} else if c != 0x0 {
|
||||
SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
|
||||
diagnose.TapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
|
||||
accept = false
|
||||
}
|
||||
}
|
||||
if !accept {
|
||||
stats.rejectOpt++
|
||||
diagnose.InternalStats.RejectOpt++
|
||||
}
|
||||
return accept
|
||||
}
|
||||
@@ -79,28 +80,28 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
// update stats
|
||||
sgStats := sg.Stats()
|
||||
if skip > 0 {
|
||||
stats.missedBytes += skip
|
||||
diagnose.InternalStats.MissedBytes += skip
|
||||
}
|
||||
stats.sz += length - saved
|
||||
stats.pkt += sgStats.Packets
|
||||
diagnose.InternalStats.Sz += length - saved
|
||||
diagnose.InternalStats.Pkt += sgStats.Packets
|
||||
if sgStats.Chunks > 1 {
|
||||
stats.reassembled++
|
||||
diagnose.InternalStats.Reassembled++
|
||||
}
|
||||
stats.outOfOrderPackets += sgStats.QueuedPackets
|
||||
stats.outOfOrderBytes += sgStats.QueuedBytes
|
||||
if length > stats.biggestChunkBytes {
|
||||
stats.biggestChunkBytes = length
|
||||
diagnose.InternalStats.OutOfOrderPackets += sgStats.QueuedPackets
|
||||
diagnose.InternalStats.OutOfOrderBytes += sgStats.QueuedBytes
|
||||
if length > diagnose.InternalStats.BiggestChunkBytes {
|
||||
diagnose.InternalStats.BiggestChunkBytes = length
|
||||
}
|
||||
if sgStats.Packets > stats.biggestChunkPackets {
|
||||
stats.biggestChunkPackets = sgStats.Packets
|
||||
if sgStats.Packets > diagnose.InternalStats.BiggestChunkPackets {
|
||||
diagnose.InternalStats.BiggestChunkPackets = sgStats.Packets
|
||||
}
|
||||
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
|
||||
// In the original example this was handled with panic().
|
||||
// I don't know what this error means or how to handle it properly.
|
||||
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
|
||||
diagnose.TapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
|
||||
}
|
||||
stats.overlapBytes += sgStats.OverlapBytes
|
||||
stats.overlapPackets += sgStats.OverlapPackets
|
||||
diagnose.InternalStats.OverlapBytes += sgStats.OverlapBytes
|
||||
diagnose.InternalStats.OverlapPackets += sgStats.OverlapPackets
|
||||
|
||||
var ident string
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
@@ -108,7 +109,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
} else {
|
||||
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
|
||||
}
|
||||
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
|
||||
diagnose.TapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
|
||||
if skip == -1 && *allowmissinginit {
|
||||
// this is allowed
|
||||
} else if skip != 0 {
|
||||
@@ -127,18 +128,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
}
|
||||
dnsSize := binary.BigEndian.Uint16(data[:2])
|
||||
missing := int(dnsSize) - len(data[2:])
|
||||
Debug("dnsSize: %d, missing: %d", dnsSize, missing)
|
||||
diagnose.TapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
|
||||
if missing > 0 {
|
||||
Debug("Missing some bytes: %d", missing)
|
||||
diagnose.TapErrors.Debug("Missing some bytes: %d", missing)
|
||||
sg.KeepFrom(0)
|
||||
return
|
||||
}
|
||||
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
|
||||
err := p.DecodeLayers(data[2:], &decoded)
|
||||
if err != nil {
|
||||
SilentError("DNS-parser", "Failed to decode DNS: %v", err)
|
||||
diagnose.TapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
|
||||
} else {
|
||||
Debug("DNS: %s", gopacket.LayerDump(dns))
|
||||
diagnose.TapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
|
||||
}
|
||||
if len(data) > 2+int(dnsSize) {
|
||||
sg.KeepFrom(2 + int(dnsSize))
|
||||
@@ -147,7 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
if length > 0 {
|
||||
// This is where we pass the reassembled information onwards
|
||||
// This channel is read by an tcpReader object
|
||||
appStats.IncReassembledTcpPayloadsCount()
|
||||
diagnose.AppStats.IncReassembledTcpPayloadsCount()
|
||||
timestamp := ac.GetCaptureInfo().Timestamp
|
||||
if dir == reassembly.TCPDirClientToServer {
|
||||
for i := range t.clients {
|
||||
@@ -173,7 +174,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
|
||||
}
|
||||
|
||||
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
|
||||
Debug("%s: Connection closed", t.ident)
|
||||
diagnose.TapErrors.Debug("%s: Connection closed", t.ident)
|
||||
if t.isTapTarget && !t.isClosed {
|
||||
t.Close()
|
||||
}
|
||||
@@ -193,7 +194,7 @@ func (t *tcpStream) Close() {
|
||||
if shouldReturn {
|
||||
return
|
||||
}
|
||||
streams.Delete(t.id)
|
||||
t.streamsMap.Delete(t.id)
|
||||
|
||||
for i := range t.clients {
|
||||
reader := &t.clients[i]
|
||||
|
||||
@@ -22,6 +22,8 @@ type tcpStreamFactory struct {
|
||||
wg sync.WaitGroup
|
||||
outboundLinkWriter *OutboundLinkWriter
|
||||
Emitter api.Emitter
|
||||
streamsMap *tcpStreamMap
|
||||
ownIps []string
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
@@ -29,8 +31,24 @@ type tcpStreamWrapper struct {
|
||||
createdAt time.Time
|
||||
}
|
||||
|
||||
var streams *sync.Map = &sync.Map{} // global
|
||||
var streamId int64 = 0
|
||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
|
||||
var ownIps []string
|
||||
|
||||
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
||||
// TODO: think this over
|
||||
logger.Log.Info("Failed to get self IP addresses")
|
||||
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
||||
ownIps = make([]string, 0)
|
||||
} else {
|
||||
ownIps = localhostIPs
|
||||
}
|
||||
|
||||
return &tcpStreamFactory{
|
||||
Emitter: emitter,
|
||||
streamsMap: streamsMap,
|
||||
ownIps: ownIps,
|
||||
}
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
||||
logger.Log.Debugf("* NEW: %s %s", net, transport)
|
||||
@@ -56,10 +74,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
ident: fmt.Sprintf("%s:%s", net, transport),
|
||||
optchecker: reassembly.NewTCPOptionCheck(),
|
||||
superIdentifier: &api.SuperIdentifier{},
|
||||
streamsMap: factory.streamsMap,
|
||||
}
|
||||
if stream.isTapTarget {
|
||||
streamId++
|
||||
stream.id = streamId
|
||||
stream.id = factory.streamsMap.nextId()
|
||||
for i, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
Request: 0,
|
||||
@@ -102,7 +120,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
counterPair: counterPair,
|
||||
})
|
||||
|
||||
streams.Store(stream.id, &tcpStreamWrapper{
|
||||
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
|
||||
stream: stream,
|
||||
createdAt: time.Now(),
|
||||
})
|
||||
@@ -142,9 +160,10 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
|
||||
}
|
||||
}
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool {
|
||||
if inArrayInt(remoteOnlyOutboundPorts, dstPort) {
|
||||
isDirectedHere := inArrayString(ownIps, dstIP)
|
||||
isDirectedHere := inArrayString(factory.ownIps, dstIP)
|
||||
return !isDirectedHere && !isPrivateIP(dstIP)
|
||||
}
|
||||
return true
|
||||
|
||||
72
tap/tcp_streams_map.go
Normal file
72
tap/tcp_streams_map.go
Normal file
@@ -0,0 +1,72 @@
|
||||
package tap
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
_debug "runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/diagnose"
|
||||
)
|
||||
|
||||
type tcpStreamMap struct {
|
||||
streams *sync.Map
|
||||
streamId int64
|
||||
}
|
||||
|
||||
func NewTcpStreamMap() *tcpStreamMap {
|
||||
return &tcpStreamMap{
|
||||
streams: &sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
func (streamMap *tcpStreamMap) Store(key, value interface{}) {
|
||||
streamMap.streams.Store(key, value)
|
||||
}
|
||||
|
||||
func (streamMap *tcpStreamMap) Delete(key interface{}) {
|
||||
streamMap.streams.Delete(key)
|
||||
}
|
||||
|
||||
func (streamMap *tcpStreamMap) nextId() int64 {
|
||||
streamMap.streamId++
|
||||
return streamMap.streamId
|
||||
}
|
||||
|
||||
func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
|
||||
tcpStreamChannelTimeout := GetTcpChannelTimeoutMs()
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
_debug.FreeOSMemory()
|
||||
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
||||
streamWrapper := value.(*tcpStreamWrapper)
|
||||
stream := streamWrapper.stream
|
||||
if stream.superIdentifier.Protocol == nil {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
|
||||
stream.Close()
|
||||
diagnose.AppStats.IncDroppedTcpStreams()
|
||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
|
||||
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
for i := range stream.clients {
|
||||
reader := &stream.clients[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
for i := range stream.servers {
|
||||
reader := &stream.servers[i]
|
||||
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
stream.superIdentifier.IsClosedOthers = true
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
1
tap/tester/.gitignore
vendored
Normal file
1
tap/tester/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
tester
|
||||
12
tap/tester/README.md
Normal file
12
tap/tester/README.md
Normal file
@@ -0,0 +1,12 @@
|
||||
|
||||
This tester used to launch passive-tapper locally without Docker or Kuberenetes environment.
|
||||
|
||||
Its good for testing purposes.
|
||||
|
||||
# How to run
|
||||
|
||||
From the `tap` folder run:
|
||||
`./tester/launch.sh`
|
||||
|
||||
The tester gets the same arguments the passive_tapper gets, run with `--help` to get a complete list of options.
|
||||
`./tester/launch.sh --help`
|
||||
10
tap/tester/launch.sh
Executable file
10
tap/tester/launch.sh
Executable file
@@ -0,0 +1,10 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
echo "Building extensions..."
|
||||
pushd .. && ./devops/build_extensions.sh && popd
|
||||
|
||||
go build -o tester tester/tester.go
|
||||
|
||||
sudo ./tester/tester "$@"
|
||||
114
tap/tester/tester.go
Normal file
114
tap/tester/tester.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"plugin"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/op/go-logging"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
func loadExtensions() ([]*tapApi.Extension, error) {
|
||||
extensionsDir := "./extensions"
|
||||
files, err := ioutil.ReadDir(extensionsDir)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
extensions := make([]*tapApi.Extension, 0)
|
||||
for _, file := range files {
|
||||
filename := file.Name()
|
||||
|
||||
if !strings.HasSuffix(filename, ".so") {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Infof("Loading extension: %s\n", filename)
|
||||
|
||||
extension := &tapApi.Extension{
|
||||
Path: path.Join(extensionsDir, filename),
|
||||
}
|
||||
|
||||
plug, err := plugin.Open(extension.Path)
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
extension.Plug = plug
|
||||
symDissector, err := plug.Lookup("Dissector")
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
dissector, ok := symDissector.(tapApi.Dissector)
|
||||
|
||||
if !ok {
|
||||
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
|
||||
}
|
||||
|
||||
dissector.Register(extension)
|
||||
extension.Dissector = dissector
|
||||
extensions = append(extensions, extension)
|
||||
}
|
||||
|
||||
sort.Slice(extensions, func(i, j int) bool {
|
||||
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
|
||||
})
|
||||
|
||||
for _, extension := range extensions {
|
||||
logger.Log.Infof("Extension Properties: %+v\n", extension)
|
||||
}
|
||||
|
||||
return extensions, nil
|
||||
}
|
||||
|
||||
func internalRun() error {
|
||||
logger.InitLoggerStderrOnly(logging.DEBUG)
|
||||
|
||||
opts := tap.TapOpts{
|
||||
HostMode: false,
|
||||
}
|
||||
|
||||
outputItems := make(chan *tapApi.OutputChannelItem, 1000)
|
||||
extenssions, err := loadExtensions()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tapOpts := tapApi.TrafficFilteringOptions{}
|
||||
|
||||
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
|
||||
|
||||
logger.Log.Infof("Tapping, press enter to exit...\n")
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
reader.ReadLine()
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
err := internalRun()
|
||||
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
case *errors.Error:
|
||||
logger.Log.Errorf("Error: %v\n", err.ErrorStack())
|
||||
default:
|
||||
logger.Log.Errorf("Error: %v\n", err)
|
||||
}
|
||||
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
135
ui/.snyk
Normal file
135
ui/.snyk
Normal file
@@ -0,0 +1,135 @@
|
||||
# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities.
|
||||
version: v1.14.0
|
||||
ignore:
|
||||
SNYK-JS-AXIOS-1579269:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-TRIMNEWLINES-1298042:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-ANSIHTML-1296849:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-ANSIREGEX-1583908:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-BROWSERSLIST-1090194:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-CSSWHAT-1298035:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-DNSPACKET-1293563:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-EJS-1049328:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-GLOBPARENT-1016905:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-IMMER-1540542:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-LODASHTEMPLATE-1088054:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-NODESASS-1059081:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-NODESASS-535498:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-NODESASS-535500:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-NODESASS-535502:
|
||||
- '*':
|
||||
reason: None Given
|
||||
SNYK-JS-NODESASS-540956:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540958:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540964:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540978:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540980:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540990:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540992:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540994:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540996:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-540998:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-541000:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NODESASS-541002:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-NTHCHECK-1586032:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-PATHPARSE-1077067:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-POSTCSS-1090595:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-POSTCSS-1255640:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-PRISMJS-1314893:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-PRISMJS-1585202:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-PROMPTS-1729737:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-SHELLQUOTE-1766506:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1536528:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1536531:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1536758:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1579147:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1579152:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TAR-1579155:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-TMPL-1583443:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-URLPARSE-1533425:
|
||||
- '*':
|
||||
reason: Non given
|
||||
SNYK-JS-WS-1296835:
|
||||
- '*':
|
||||
reason: Non given
|
||||
@@ -205,7 +205,7 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({titl
|
||||
<tbody>
|
||||
{arrayToIterate.map(({rule, matched}, index) => {
|
||||
return (
|
||||
<EntryPolicySectionContainer key={index} label={rule.Name} matched={matched && (rule.Type === 'latency' ? rule.Latency >= latency : true)? "Success" : "Failure"}>
|
||||
<EntryPolicySectionContainer key={index} label={rule.Name} matched={matched && (rule.Type === 'slo' ? rule.ResponseTime >= latency : true)? "Success" : "Failure"}>
|
||||
{
|
||||
<>
|
||||
{
|
||||
@@ -213,8 +213,8 @@ export const EntryTablePolicySection: React.FC<EntryPolicySectionProps> = ({titl
|
||||
<tr className={styles.dataValue}><td><b>Key:</b></td> <td>{rule.Key}</td></tr>
|
||||
}
|
||||
{
|
||||
rule.Latency !== 0 &&
|
||||
<tr className={styles.dataValue}><td><b>Latency:</b></td> <td>{rule.Latency}</td></tr>
|
||||
rule.ResponseTime !== 0 &&
|
||||
<tr className={styles.dataValue}><td><b>Response Time:</b></td> <td>{rule.ResponseTime}</td></tr>
|
||||
}
|
||||
{
|
||||
rule.Method &&
|
||||
|
||||
Reference in New Issue
Block a user