mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-04-23 19:06:45 +00:00
Compare commits
6 Commits
26.0-dev7
...
26.0-dev13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a42a0cd0b9 | ||
|
|
145004fe43 | ||
|
|
0a4674ea7c | ||
|
|
f013b0f03c | ||
|
|
2c72c27bc9 | ||
|
|
98ca1b2840 |
8
.github/workflows/acceptance_tests.yml
vendored
8
.github/workflows/acceptance_tests.yml
vendored
@@ -8,19 +8,15 @@ on:
|
||||
branches:
|
||||
- 'develop'
|
||||
|
||||
concurrency:
|
||||
group: mizu-acceptance-tests-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
run-acceptance-tests:
|
||||
name: Run acceptance tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
4
.github/workflows/build.yml
vendored
4
.github/workflows/build.yml
vendored
@@ -15,10 +15,10 @@ jobs:
|
||||
name: CLI executable build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.16'
|
||||
go-version: '1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -229,10 +229,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
needs: [docker-manifest, gcp-registry]
|
||||
steps:
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.16'
|
||||
go-version: '1.17'
|
||||
|
||||
- name: Check out the repo
|
||||
uses: actions/checkout@v2
|
||||
|
||||
2
.github/workflows/static_code_analysis.yml
vendored
2
.github/workflows/static_code_analysis.yml
vendored
@@ -17,7 +17,7 @@ jobs:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
|
||||
4
.github/workflows/test.yml
vendored
4
.github/workflows/test.yml
vendored
@@ -19,10 +19,10 @@ jobs:
|
||||
name: Unit Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.16'
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
@@ -14,7 +14,7 @@ RUN npm run build
|
||||
RUN npm run build-ent
|
||||
|
||||
### Base builder image for native builds architecture
|
||||
FROM golang:1.16-alpine AS builder-native-base
|
||||
FROM golang:1.17-alpine AS builder-native-base
|
||||
ENV CGO_ENABLED=1 GOOS=linux
|
||||
RUN apk add libpcap-dev g++ perl-utils
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
module github.com/up9inc/mizu/tests
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.4
|
||||
@@ -10,6 +10,40 @@ require (
|
||||
k8s.io/client-go v0.21.2
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/googleapis/gnostic v0.4.1 // indirect
|
||||
github.com/imdario/mergo v0.3.5 // indirect
|
||||
github.com/json-iterator/go v1.1.10 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
|
||||
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
|
||||
golang.org/x/text v0.3.6 // indirect
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
|
||||
google.golang.org/appengine v1.6.5 // indirect
|
||||
google.golang.org/protobuf v1.26.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/api v0.21.2 // indirect
|
||||
k8s.io/klog/v2 v2.8.0 // indirect
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
|
||||
|
||||
89
agent/go.mod
89
agent/go.mod
@@ -1,6 +1,6 @@
|
||||
module github.com/up9inc/mizu/agent
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
||||
@@ -37,6 +37,93 @@ require (
|
||||
k8s.io/client-go v0.21.2
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.65.0 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest/autorest v0.11.12 // indirect
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
|
||||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.0 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect
|
||||
github.com/beevik/etree v1.1.0 // indirect
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 // indirect
|
||||
github.com/chanced/dynamic v0.0.0-20210502140838-c010b5fc3e44 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/fatih/camelcase v1.0.0 // indirect
|
||||
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/go-openapi/analysis v0.20.0 // indirect
|
||||
github.com/go-openapi/errors v0.20.1 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
github.com/go-openapi/jsonreference v0.19.5 // indirect
|
||||
github.com/go-openapi/loads v0.20.2 // indirect
|
||||
github.com/go-openapi/runtime v0.20.0 // indirect
|
||||
github.com/go-openapi/spec v0.20.3 // indirect
|
||||
github.com/go-openapi/strfmt v0.20.3 // indirect
|
||||
github.com/go-openapi/swag v0.19.15 // indirect
|
||||
github.com/go-openapi/validate v0.20.3 // indirect
|
||||
github.com/go-stack/stack v1.8.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0 // indirect
|
||||
github.com/golang/protobuf v1.4.3 // indirect
|
||||
github.com/golang/snappy v0.0.1 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/gopacket v1.1.19 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/googleapis/gnostic v0.4.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.1 // indirect
|
||||
github.com/imdario/mergo v0.3.5 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.10 // indirect
|
||||
github.com/klauspost/compress v1.14.1 // indirect
|
||||
github.com/leodido/go-urn v1.2.0 // indirect
|
||||
github.com/mailru/easyjson v0.7.6 // indirect
|
||||
github.com/mattn/go-isatty v0.0.12 // indirect
|
||||
github.com/mitchellh/mapstructure v1.4.1 // indirect
|
||||
github.com/moby/spdystream v0.2.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/ohler55/ojg v1.12.12 // indirect
|
||||
github.com/oklog/ulid v1.3.1 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
|
||||
github.com/segmentio/kafka-go v0.4.27 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/tidwall/gjson v1.12.0 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/tidwall/sjson v1.2.3 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
|
||||
go.mongodb.org/mongo-driver v1.5.1 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20210323180902-22b0adad7558 // indirect
|
||||
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
|
||||
golang.org/x/text v0.3.5 // indirect
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
|
||||
google.golang.org/appengine v1.6.6 // indirect
|
||||
google.golang.org/protobuf v1.25.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
|
||||
k8s.io/klog/v2 v2.8.0 // indirect
|
||||
k8s.io/kubectl v0.21.2 // indirect
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap
|
||||
|
||||
303
agent/main.go
303
agent/main.go
@@ -9,12 +9,14 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
@@ -23,30 +25,18 @@ import (
|
||||
"github.com/up9inc/mizu/agent/pkg/up9"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/controllers"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/app"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/antelman107/net-wait-go/wait"
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/op/go-logging"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
|
||||
httpExt "github.com/up9inc/mizu/tap/extensions/http"
|
||||
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
|
||||
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
|
||||
)
|
||||
|
||||
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
|
||||
@@ -56,10 +46,6 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP
|
||||
var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)")
|
||||
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
|
||||
var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
|
||||
|
||||
var extensions []*tapApi.Extension // global
|
||||
var extensionsMap map[string]*tapApi.Extension // global
|
||||
|
||||
var startTime int64
|
||||
|
||||
const (
|
||||
@@ -75,78 +61,20 @@ func main() {
|
||||
if err := config.LoadConfig(); err != nil {
|
||||
logger.Log.Fatalf("Error loading config file %v", err)
|
||||
}
|
||||
loadExtensions()
|
||||
app.LoadExtensions()
|
||||
|
||||
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
|
||||
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
|
||||
}
|
||||
|
||||
if *standaloneMode {
|
||||
api.StartResolving(*namespace)
|
||||
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
|
||||
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||
|
||||
hostApi(nil)
|
||||
runInStandaloneMode()
|
||||
} else if *tapperMode {
|
||||
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
if *apiServerAddress == "" {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tapTargets := getTapTargets()
|
||||
if tapTargets != nil {
|
||||
tapOpts.FilterAuthorities = tapTargets
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
|
||||
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||
}
|
||||
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
|
||||
|
||||
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
|
||||
runInTapperMode()
|
||||
} else if *apiServerMode {
|
||||
configureBasenineServer(shared.BasenineHost, shared.BaseninePort)
|
||||
startTime = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
api.StartResolving(*namespace)
|
||||
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
enableExpFeatureIfNeeded()
|
||||
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
|
||||
|
||||
syncEntriesConfig := getSyncEntriesConfig()
|
||||
if syncEntriesConfig != nil {
|
||||
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
|
||||
logger.Log.Error("Error syncing entries, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
hostApi(outputItemsChannel)
|
||||
utils.StartServer(runInApiServerMode(*namespace))
|
||||
} else if *harsReaderMode {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
|
||||
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
go filterItems(outputItemsChannel, filteredHarChannel)
|
||||
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
|
||||
hostApi(nil)
|
||||
runInHarReaderMode()
|
||||
}
|
||||
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
@@ -156,89 +84,7 @@ func main() {
|
||||
logger.Log.Info("Exiting")
|
||||
}
|
||||
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
servicemap.GetInstance().SetConfig(config.Config)
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
|
||||
func configureBasenineServer(host string, port string) {
|
||||
if !wait.New(
|
||||
wait.WithProto("tcp"),
|
||||
wait.WithWait(200*time.Millisecond),
|
||||
wait.WithBreak(50*time.Millisecond),
|
||||
wait.WithDeadline(5*time.Second),
|
||||
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
|
||||
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
|
||||
logger.Log.Panicf("Basenine is not available!")
|
||||
}
|
||||
|
||||
// Limit the database size to default 200MB
|
||||
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while limiting database size: %v", err)
|
||||
}
|
||||
|
||||
// Define the macros
|
||||
for _, extension := range extensions {
|
||||
macros := extension.Dissector.Macros()
|
||||
for macro, expanded := range macros {
|
||||
err = basenine.Macro(host, port, macro, expanded)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while adding a macro: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func loadExtensions() {
|
||||
extensions = make([]*tapApi.Extension, 4)
|
||||
extensionsMap = make(map[string]*tapApi.Extension)
|
||||
|
||||
extensionAmqp := &tapApi.Extension{}
|
||||
dissectorAmqp := amqpExt.NewDissector()
|
||||
dissectorAmqp.Register(extensionAmqp)
|
||||
extensionAmqp.Dissector = dissectorAmqp
|
||||
extensions[0] = extensionAmqp
|
||||
extensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
|
||||
|
||||
extensionHttp := &tapApi.Extension{}
|
||||
dissectorHttp := httpExt.NewDissector()
|
||||
dissectorHttp.Register(extensionHttp)
|
||||
extensionHttp.Dissector = dissectorHttp
|
||||
extensions[1] = extensionHttp
|
||||
extensionsMap[extensionHttp.Protocol.Name] = extensionHttp
|
||||
|
||||
extensionKafka := &tapApi.Extension{}
|
||||
dissectorKafka := kafkaExt.NewDissector()
|
||||
dissectorKafka.Register(extensionKafka)
|
||||
extensionKafka.Dissector = dissectorKafka
|
||||
extensions[2] = extensionKafka
|
||||
extensionsMap[extensionKafka.Protocol.Name] = extensionKafka
|
||||
|
||||
extensionRedis := &tapApi.Extension{}
|
||||
dissectorRedis := redisExt.NewDissector()
|
||||
dissectorRedis.Register(extensionRedis)
|
||||
extensionRedis.Dissector = dissectorRedis
|
||||
extensions[3] = extensionRedis
|
||||
extensionsMap[extensionRedis.Protocol.Name] = extensionRedis
|
||||
|
||||
sort.Slice(extensions, func(i, j int) bool {
|
||||
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
|
||||
})
|
||||
|
||||
for _, extension := range extensions {
|
||||
logger.Log.Infof("Extension Properties: %+v", extension)
|
||||
}
|
||||
|
||||
controllers.InitExtensionsMap(extensionsMap)
|
||||
}
|
||||
|
||||
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
|
||||
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
|
||||
app := gin.Default()
|
||||
|
||||
app.GET("/echo", func(c *gin.Context) {
|
||||
@@ -249,7 +95,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
|
||||
SocketOutChannel: socketHarOutputChannel,
|
||||
}
|
||||
|
||||
app.Use(DisableRootStaticCache())
|
||||
app.Use(disableRootStaticCache())
|
||||
|
||||
var staticFolder string
|
||||
if config.Config.StandaloneMode {
|
||||
@@ -288,10 +134,108 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) {
|
||||
routes.EntriesRoutes(app)
|
||||
routes.MetadataRoutes(app)
|
||||
routes.StatusRoutes(app)
|
||||
utils.StartServer(app)
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func DisableRootStaticCache() gin.HandlerFunc {
|
||||
func runInApiServerMode(namespace string) *gin.Engine {
|
||||
app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort)
|
||||
startTime = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
api.StartResolving(namespace)
|
||||
|
||||
enableExpFeatureIfNeeded()
|
||||
|
||||
syncEntriesConfig := getSyncEntriesConfig()
|
||||
if syncEntriesConfig != nil {
|
||||
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
|
||||
logger.Log.Error("Error syncing entries, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return hostApi(app.GetEntryInputChannel())
|
||||
}
|
||||
|
||||
func runInTapperMode() {
|
||||
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
if *apiServerAddress == "" {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tapTargets := getTapTargets()
|
||||
if tapTargets != nil {
|
||||
tapOpts.FilterAuthorities = tapTargets
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, app.Extensions, filteringOptions)
|
||||
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||
}
|
||||
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
|
||||
|
||||
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
|
||||
}
|
||||
|
||||
func runInStandaloneMode() {
|
||||
api.StartResolving(*namespace)
|
||||
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
filteringOptions := getTrafficFilteringOptions()
|
||||
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
|
||||
tapOpts := &tap.TapOpts{HostMode: hostMode}
|
||||
tap.StartPassiveTapper(tapOpts, outputItemsChannel, app.Extensions, filteringOptions)
|
||||
|
||||
go app.FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, app.ExtensionsMap)
|
||||
|
||||
ginApp := hostApi(nil)
|
||||
utils.StartServer(ginApp)
|
||||
}
|
||||
|
||||
func runInHarReaderMode() {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
|
||||
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
|
||||
|
||||
go app.FilterItems(outputItemsChannel, filteredHarChannel)
|
||||
go api.StartReadingEntries(filteredHarChannel, harsDir, app.ExtensionsMap)
|
||||
ginApp := hostApi(nil)
|
||||
utils.StartServer(ginApp)
|
||||
}
|
||||
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
servicemap.GetInstance().SetConfig(config.Config)
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
|
||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
|
||||
if syncEntriesConfigJson == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var syncEntriesConfig = &shared.SyncEntriesConfig{}
|
||||
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
|
||||
}
|
||||
|
||||
return syncEntriesConfig
|
||||
}
|
||||
|
||||
func disableRootStaticCache() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if c.Request.RequestURI == "/" {
|
||||
// Disable cache only for the main static route
|
||||
@@ -357,16 +301,6 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
|
||||
return &filteringOptions
|
||||
}
|
||||
|
||||
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
|
||||
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
|
||||
if connection == nil {
|
||||
panic("Websocket connection is nil")
|
||||
@@ -402,21 +336,6 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
|
||||
}
|
||||
}
|
||||
|
||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
|
||||
if syncEntriesConfigJson == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var syncEntriesConfig = &shared.SyncEntriesConfig{}
|
||||
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
|
||||
}
|
||||
|
||||
return syncEntriesConfig
|
||||
}
|
||||
|
||||
func determineLogLevel() (logLevel logging.Level) {
|
||||
logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar))
|
||||
if err != nil {
|
||||
|
||||
@@ -30,14 +30,18 @@ type SocketConnection struct {
|
||||
isTapper bool
|
||||
}
|
||||
|
||||
var websocketUpgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
var (
|
||||
websocketUpgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
var websocketIdsLock = sync.Mutex{}
|
||||
var connectedWebsockets map[int]*SocketConnection
|
||||
var connectedWebsocketIdCounter = 0
|
||||
websocketIdsLock = sync.Mutex{}
|
||||
connectedWebsockets map[int]*SocketConnection
|
||||
connectedWebsocketIdCounter = 0
|
||||
SocketGetBrowserHandler gin.HandlerFunc
|
||||
SocketGetTapperHandler gin.HandlerFunc
|
||||
)
|
||||
|
||||
func init() {
|
||||
websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket
|
||||
@@ -45,12 +49,20 @@ func init() {
|
||||
}
|
||||
|
||||
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {
|
||||
app.GET("/ws", func(c *gin.Context) {
|
||||
SocketGetBrowserHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime)
|
||||
}
|
||||
|
||||
SocketGetTapperHandler = func(c *gin.Context) {
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
|
||||
}
|
||||
|
||||
app.GET("/ws", func(c *gin.Context) {
|
||||
SocketGetBrowserHandler(c)
|
||||
})
|
||||
|
||||
app.GET("/wsTapper", func(c *gin.Context) { // TODO: add m2m authentication to this route
|
||||
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
|
||||
SocketGetTapperHandler(c)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
116
agent/pkg/app/main.go
Normal file
116
agent/pkg/app/main.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/antelman107/net-wait-go/wait"
|
||||
"github.com/op/go-logging"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
"github.com/up9inc/mizu/agent/pkg/controllers"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
|
||||
httpExt "github.com/up9inc/mizu/tap/extensions/http"
|
||||
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
|
||||
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
|
||||
)
|
||||
|
||||
var (
|
||||
Extensions []*tapApi.Extension // global
|
||||
ExtensionsMap map[string]*tapApi.Extension // global
|
||||
)
|
||||
|
||||
func LoadExtensions() {
|
||||
Extensions = make([]*tapApi.Extension, 4)
|
||||
ExtensionsMap = make(map[string]*tapApi.Extension)
|
||||
|
||||
extensionAmqp := &tapApi.Extension{}
|
||||
dissectorAmqp := amqpExt.NewDissector()
|
||||
dissectorAmqp.Register(extensionAmqp)
|
||||
extensionAmqp.Dissector = dissectorAmqp
|
||||
Extensions[0] = extensionAmqp
|
||||
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
|
||||
|
||||
extensionHttp := &tapApi.Extension{}
|
||||
dissectorHttp := httpExt.NewDissector()
|
||||
dissectorHttp.Register(extensionHttp)
|
||||
extensionHttp.Dissector = dissectorHttp
|
||||
Extensions[1] = extensionHttp
|
||||
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
|
||||
|
||||
extensionKafka := &tapApi.Extension{}
|
||||
dissectorKafka := kafkaExt.NewDissector()
|
||||
dissectorKafka.Register(extensionKafka)
|
||||
extensionKafka.Dissector = dissectorKafka
|
||||
Extensions[2] = extensionKafka
|
||||
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
|
||||
|
||||
extensionRedis := &tapApi.Extension{}
|
||||
dissectorRedis := redisExt.NewDissector()
|
||||
dissectorRedis.Register(extensionRedis)
|
||||
extensionRedis.Dissector = dissectorRedis
|
||||
Extensions[3] = extensionRedis
|
||||
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
|
||||
|
||||
sort.Slice(Extensions, func(i, j int) bool {
|
||||
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
|
||||
})
|
||||
|
||||
for _, extension := range Extensions {
|
||||
logger.Log.Infof("Extension Properties: %+v", extension)
|
||||
}
|
||||
|
||||
controllers.InitExtensionsMap(ExtensionsMap)
|
||||
}
|
||||
|
||||
func ConfigureBasenineServer(host string, port string) {
|
||||
if !wait.New(
|
||||
wait.WithProto("tcp"),
|
||||
wait.WithWait(200*time.Millisecond),
|
||||
wait.WithBreak(50*time.Millisecond),
|
||||
wait.WithDeadline(5*time.Second),
|
||||
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
|
||||
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
|
||||
logger.Log.Panicf("Basenine is not available!")
|
||||
}
|
||||
|
||||
// Limit the database size to default 200MB
|
||||
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while limiting database size: %v", err)
|
||||
}
|
||||
|
||||
// Define the macros
|
||||
for _, extension := range Extensions {
|
||||
macros := extension.Dissector.Macros()
|
||||
for macro, expanded := range macros {
|
||||
err = basenine.Macro(host, port, macro, expanded)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while adding a macro: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
|
||||
|
||||
return outputItemsChannel
|
||||
}
|
||||
|
||||
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
@@ -339,7 +339,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
logger.Log.Debugf("Watching API Server pod loop, modified: %v, containers statuses: %v", modifiedPod.Status.Phase, modifiedPod.Status.ContainerStatuses)
|
||||
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
@@ -406,7 +406,7 @@ func watchApiServerEvents(ctx context.Context, kubernetesProvider *kubernetes.Pr
|
||||
case "FailedScheduling", "Failed":
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu API Server status: %s - %s", event.Reason, event.Note))
|
||||
cancel()
|
||||
|
||||
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
|
||||
50
cli/go.mod
50
cli/go.mod
@@ -1,6 +1,6 @@
|
||||
module github.com/up9inc/mizu/cli
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/creasty/defaults v1.5.1
|
||||
@@ -20,6 +20,54 @@ require (
|
||||
k8s.io/client-go v0.22.3
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.54.0 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest/autorest v0.11.18 // indirect
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.13 // indirect
|
||||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.1 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
|
||||
github.com/ghodss/yaml v1.0.0 // indirect
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.5 // indirect
|
||||
github.com/go-openapi/swag v0.19.5 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/google/go-querystring v1.0.0 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||
github.com/imdario/mergo v0.3.5 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/mailru/easyjson v0.7.0 // indirect
|
||||
github.com/moby/spdystream v0.2.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
|
||||
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023 // indirect
|
||||
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 // indirect
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
|
||||
golang.org/x/text v0.3.6 // indirect
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
|
||||
google.golang.org/appengine v1.6.5 // indirect
|
||||
google.golang.org/protobuf v1.26.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/klog/v2 v2.9.0 // indirect
|
||||
k8s.io/kubectl v0.21.2 // indirect
|
||||
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
FROM dockcross/linux-arm64-musl:latest AS builder-from-amd64-to-arm64v8
|
||||
|
||||
# Install Go
|
||||
RUN curl https://go.dev/dl/go1.16.13.linux-amd64.tar.gz -Lo ./go.linux-amd64.tar.gz
|
||||
RUN curl https://go.dev/dl/go1.16.13.linux-amd64.tar.gz.asc -Lo ./go.linux-amd64.tar.gz.asc
|
||||
RUN curl https://go.dev/dl/go1.17.6.linux-amd64.tar.gz -Lo ./go.linux-amd64.tar.gz
|
||||
RUN curl https://go.dev/dl/go1.17.6.linux-amd64.tar.gz.asc -Lo ./go.linux-amd64.tar.gz.asc
|
||||
RUN curl https://dl.google.com/dl/linux/linux_signing_key.pub -Lo linux_signing_key.pub
|
||||
RUN gpg --import linux_signing_key.pub && gpg --verify ./go.linux-amd64.tar.gz.asc ./go.linux-amd64.tar.gz
|
||||
RUN rm -rf /usr/local/go && tar -C /usr/local -xzf go.linux-amd64.tar.gz
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
module github.com/up9inc/mizu/shared
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/docker/go-units v0.4.0
|
||||
@@ -14,4 +14,46 @@ require (
|
||||
k8s.io/kubectl v0.21.2
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.54.0 // indirect
|
||||
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
|
||||
github.com/Azure/go-autorest/autorest v0.11.12 // indirect
|
||||
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
|
||||
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
|
||||
github.com/Azure/go-autorest/logger v0.2.0 // indirect
|
||||
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/protobuf v1.4.3 // indirect
|
||||
github.com/google/go-cmp v0.5.4 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/googleapis/gnostic v0.4.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.1 // indirect
|
||||
github.com/imdario/mergo v0.3.5 // indirect
|
||||
github.com/json-iterator/go v1.1.10 // indirect
|
||||
github.com/moby/spdystream v0.2.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
|
||||
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
|
||||
golang.org/x/text v0.3.4 // indirect
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
|
||||
google.golang.org/appengine v1.6.5 // indirect
|
||||
google.golang.org/protobuf v1.25.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/klog/v2 v2.8.0 // indirect
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api
|
||||
|
||||
@@ -29,12 +29,10 @@ func (wh *EventWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if !wh.NameRegexFilter.MatchString(event.Name) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if strings.EqualFold(event.Regarding.Kind, wh.Kind) {
|
||||
if !strings.EqualFold(event.Regarding.Kind, wh.Kind) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -58,8 +58,10 @@ type TcpID struct {
|
||||
}
|
||||
|
||||
type CounterPair struct {
|
||||
StreamId int64
|
||||
Request uint
|
||||
Response uint
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type GenericMessage struct {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
module github.com/up9inc/mizu/tap/api
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require github.com/google/martian v2.1.0+incompatible
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/amqp
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
require github.com/google/martian v2.1.0+incompatible // indirect
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
|
||||
|
||||
@@ -362,7 +362,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
||||
for name, value := range properties["headers"].(map[string]interface{}) {
|
||||
headers = append(headers, api.TableData{
|
||||
Name: name,
|
||||
Value: value.(string),
|
||||
Value: value,
|
||||
Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/http
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/beevik/etree v1.1.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
golang.org/x/text v0.3.5 // indirect
|
||||
)
|
||||
|
||||
|
||||
@@ -115,7 +115,10 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
counterPair.Lock()
|
||||
counterPair.Request++
|
||||
requestCounter := counterPair.Request
|
||||
counterPair.Unlock()
|
||||
|
||||
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
|
||||
@@ -127,12 +130,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
requestCounter,
|
||||
"HTTP1",
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
|
||||
@@ -155,7 +159,10 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
counterPair.Lock()
|
||||
counterPair.Response++
|
||||
responseCounter := counterPair.Response
|
||||
counterPair.Unlock()
|
||||
|
||||
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
|
||||
@@ -167,12 +174,13 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
responseCounter,
|
||||
"HTTP1",
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor)
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -23,9 +21,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestHTTPMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
@@ -35,7 +30,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseHTTPMessage := response.(*api.GenericMessage)
|
||||
if responseHTTPMessage.IsRequest {
|
||||
@@ -44,14 +39,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
||||
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestHTTPMessage)
|
||||
matcher.openMessagesMap.Store(ident, &requestHTTPMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseHTTPMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
@@ -61,7 +53,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestHTTPMessage := request.(*api.GenericMessage)
|
||||
if !requestHTTPMessage.IsRequest {
|
||||
@@ -70,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseHTTPMessage)
|
||||
matcher.openMessagesMap.Store(ident, &responseHTTPMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -89,13 +81,3 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
|
||||
return key
|
||||
}
|
||||
|
||||
@@ -1,13 +1,19 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/kafka
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/fatih/camelcase v1.0.0
|
||||
github.com/klauspost/compress v1.14.1 // indirect
|
||||
github.com/ohler55/ojg v1.12.12
|
||||
github.com/segmentio/kafka-go v0.4.27
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/golang/snappy v0.0.1 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/klauspost/compress v1.14.1 // indirect
|
||||
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
|
||||
|
||||
@@ -368,24 +368,26 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
||||
}
|
||||
recordsResults := recordsPath.Get(obj)
|
||||
if len(recordsResults) > 0 {
|
||||
records := recordsResults[0].([]interface{})
|
||||
for i, _record := range records {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
delete(record, "value")
|
||||
if recordsResults[0] != nil {
|
||||
records := recordsResults[0].([]interface{})
|
||||
for i, _record := range records {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
delete(record, "value")
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Record [%d] Value", i),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Record [%d] Value", i),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -614,22 +616,24 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
||||
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
|
||||
})
|
||||
|
||||
for k, _record := range recordBatch["record"].([]interface{}) {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
if recordBatch["record"] != nil {
|
||||
for k, _record := range recordBatch["record"].([]interface{}) {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -730,6 +734,9 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if payload["topics"] == nil {
|
||||
return rep
|
||||
}
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
@@ -766,6 +773,9 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if payload["topics"] == nil {
|
||||
return rep
|
||||
}
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
|
||||
@@ -47,13 +47,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID, superTimer)
|
||||
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
} else {
|
||||
err := ReadResponse(b, tcpID, superTimer, emitter)
|
||||
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -120,7 +120,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case CreateTopics:
|
||||
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
@@ -128,6 +132,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if reqDetails["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := reqDetails["topicNames"].([]string)
|
||||
for _, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
|
||||
@@ -19,7 +19,7 @@ type Request struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -214,7 +214,8 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%s:%s->%s:%s::%d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstIP,
|
||||
|
||||
@@ -16,7 +16,7 @@ type Response struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -44,7 +44,8 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%s:%s->%s:%s::%d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcIP,
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
module github.com/up9inc/mizu/tap/extensions/redis
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
require github.com/google/martian v2.1.0+incompatible // indirect
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
|
||||
|
||||
@@ -7,15 +7,21 @@ import (
|
||||
)
|
||||
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Request++
|
||||
requestCounter := counterPair.Request
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
requestCounter,
|
||||
)
|
||||
|
||||
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
@@ -31,15 +37,21 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
|
||||
}
|
||||
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Response++
|
||||
responseCounter := counterPair.Response
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
responseCounter,
|
||||
)
|
||||
|
||||
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -11,7 +9,7 @@ import (
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
||||
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
@@ -22,9 +20,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestRedisMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
@@ -37,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseRedisMessage := response.(*api.GenericMessage)
|
||||
if responseRedisMessage.IsRequest {
|
||||
@@ -46,14 +41,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
|
||||
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestRedisMessage)
|
||||
matcher.openMessagesMap.Store(ident, &requestRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseRedisMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
@@ -66,7 +58,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestRedisMessage := request.(*api.GenericMessage)
|
||||
if !requestRedisMessage.IsRequest {
|
||||
@@ -75,7 +67,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseRedisMessage)
|
||||
matcher.openMessagesMap.Store(ident, &responseRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -90,13 +82,3 @@ func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.Gene
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
|
||||
return key
|
||||
}
|
||||
|
||||
22
tap/go.mod
22
tap/go.mod
@@ -1,6 +1,6 @@
|
||||
module github.com/up9inc/mizu/tap
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
|
||||
@@ -11,6 +11,26 @@ require (
|
||||
k8s.io/api v0.21.2
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v0.4.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/google/go-cmp v0.5.4 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/martian v2.1.0+incompatible // indirect
|
||||
github.com/json-iterator/go v1.1.10 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.1 // indirect
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
|
||||
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect
|
||||
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887 // indirect
|
||||
golang.org/x/text v0.3.4 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/apimachinery v0.21.2 // indirect
|
||||
k8s.io/klog/v2 v2.8.0 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/tap/api v0.0.0 => ./api
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
@@ -82,6 +82,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
stream.id = factory.streamsMap.nextId()
|
||||
for i, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
StreamId: stream.id,
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user