Compare commits

...

11 Commits

Author SHA1 Message Date
M. Mert Yıldıran
1de50b0572 Fix the request-response matcher maps iteration in clean() method and share the streams map with the TLS tapper (#1059)
* Fix `panic: interface conversion: api.RequestResponseMatcher is nil, not *http.requestResponseMatcher` error

Also fix the request-response matcher maps iteration in `clean()` method.

* Fix the mocks in the unit tests

* Remove unnecessary fields from `tlsPoller` and implement `SetProtocol` method

* Use concrete types in `tap` package

* Share the streams map with the TLS tapper

* Check interface conversion error
2022-05-01 16:16:22 +03:00
AmitUp9
0881dad17f update craco config to resolve material ui instances (#1060) 2022-05-01 16:05:53 +03:00
David Levanon
cade960b9b Fix tls + creating tls_stream (#1058) 2022-05-01 14:46:31 +03:00
M. Mert Yıldıran
d3e6a69d82 Refactor tap module to achieve synchronously closing other protocol dissectors upon identification (#1026)
* Remove `tcpStreamWrapper` struct

* Refactor `tap` module and move some of the code to `tap/api` module

* Move `TrafficFilteringOptions` struct to `shared` module

* Change the `Dissect` method signature to have `*TcpReader` as an argument

* Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors

* Run `go mod tidy` in `cli` module

* Rename `SuperIdentifier` struct to `ProtoIdentifier`

* Remove `SuperTimer` struct

* Bring back `CloseTimedoutTcpStreamChannels` method

* Run `go mod tidy` everywhere

* Remove `GOGC` environment variable from tapper

* Fix the tests

* Bring back `debug.FreeOSMemory()` call

* Make `CloseOtherProtocolDissectors` method mutexed

* Revert "Remove `GOGC` environment variable from tapper"

This reverts commit cfc2484bbb.

* Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags

* Define a bunch of interfaces and don't export any new structs from `tap/api`

* Keep the interfaces in `tap/api` but move the structs to `tap/tcp`

* Fix the unit tests by depending on `github.com/up9inc/mizu/tap`

* Use the modified `tlsEmitter`

* Define `TlsChunk` interface and make `tlsReader` implement `TcpReader`

* Remove unused fields in `tlsReader`

* Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct

Such that make `tap/api` don't depend on `gopacket`

* Remove the unused fields

* Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method

* Remove unused fields from `tlsPoller`

* Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface

* Revert "Revert "Remove `GOGC` environment variable from tapper""

This reverts commit ab2b9a803b.

* Revert "Bring back `debug.FreeOSMemory()` call"

This reverts commit 1cce863bbb.

* Remove excess comment

* Fix acceptance tests (`logger` module) #run_acceptance_tests

* Bring back `github.com/patrickmn/go-cache`

* Fix `NewTcpStream` method signature

* Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency

* Fix AMQP tests

* Revert 960ba644cd

* Revert `go.mod` and `go.sum` files in protocol dissectors

* Fix the comment position

* Revert `AppStatsInst` change

* Fix indent

* Fix CLI build

* Fix linter error

* Fix error msg

* Revert some of the changes in `chunk.go`
2022-04-28 17:19:14 +03:00
AmitUp9
ed9e162af0 validation that grpc error render only when needed (#1051) 2022-04-28 15:18:08 +03:00
Igor Gov
4e22e77597 Fix: remove agent unused go dependency (#1050) 2022-04-28 12:08:59 +03:00
AmitUp9
3978ace4ef searchable dropdown added to oas modal (#1044)
* searchable dropdown added to oas modal

* remove unnecessary attribute from autocomplete

* move css to sass file
2022-04-28 11:59:13 +03:00
Igor Gov
e71a12d399 Introducing eslint (#1048)
* Introducing eslint
2022-04-28 11:46:00 +03:00
M. Mert Yıldıran
90c54f9505 Fix acceptance tests (logger module) (#1049) 2022-04-27 23:59:16 +03:00
M. Mert Yıldıran
e1ad302c29 Make logger a separate module such that don't depend on shared module as a whole for logging (#1047)
* Make `logger` a separate module such that don't depend on `shared` module as a whole for logging

* Update `Dockerfile`
2022-04-27 22:26:27 +03:00
lirazyehezkel
ee8dce4466 Clear entry detailed on workspace change (#1045) 2022-04-26 15:40:42 +03:00
146 changed files with 1784 additions and 1535 deletions

View File

@@ -141,3 +141,42 @@ jobs:
with: with:
version: latest version: latest
working-directory: tap/extensions/redis working-directory: tap/extensions/redis
- uses: actions/setup-node@v2
with:
node-version: 16
- name: Check modified UI files
id: ui_modified_files
run: devops/check_modified_files.sh ui/
- name: ESLint prerequisites ui
if: steps.ui_modified_files.outputs.matched == 'true'
run: |
sudo npm install -g eslint
cd ui
npm run prestart
npm i
- name: ESLint ui
if: steps.ui_modified_files.outputs.matched == 'true'
run: |
cd ui
npm run eslint
- name: Check modified ui-common files
id: ui_common_modified_files
run: devops/check_modified_files.sh ui-common/
- name: ESLint prerequisites ui-common
if: steps.ui_common_modified_files.outputs.matched == 'true'
run: |
sudo npm install -g eslint
cd ui-common
npm i
- name: ESLint ui-common
if: steps.ui_common_modified_files.outputs.matched == 'true'
run: |
cd ui-common
npm run eslint

View File

@@ -58,6 +58,7 @@ WORKDIR /app/agent-build
COPY agent/go.mod agent/go.sum ./ COPY agent/go.mod agent/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/ COPY shared/go.mod shared/go.mod ../shared/
COPY logger/go.mod logger/go.mod ../logger/
COPY tap/go.mod tap/go.mod ../tap/ COPY tap/go.mod tap/go.mod ../tap/
COPY tap/api/go.mod ../tap/api/ COPY tap/api/go.mod ../tap/api/
COPY tap/extensions/amqp/go.mod ../tap/extensions/amqp/ COPY tap/extensions/amqp/go.mod ../tap/extensions/amqp/
@@ -66,10 +67,12 @@ COPY tap/extensions/kafka/go.mod ../tap/extensions/kafka/
COPY tap/extensions/redis/go.mod ../tap/extensions/redis/ COPY tap/extensions/redis/go.mod ../tap/extensions/redis/
RUN go mod download RUN go mod download
# cheap trick to make the build faster (as long as go.mod did not change) # cheap trick to make the build faster (as long as go.mod did not change)
RUN go get github.com/patrickmn/go-cache
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' | xargs go get RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' | xargs go get
# Copy and build agent code # Copy and build agent code
COPY shared ../shared COPY shared ../shared
COPY logger ../logger
COPY tap ../tap COPY tap ../tap
COPY agent . COPY agent .

View File

@@ -29,6 +29,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/up9inc/mizu/logger v0.0.0 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
@@ -48,6 +49,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect
) )
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -19,9 +19,9 @@ require (
github.com/nav-inc/datetime v0.1.3 github.com/nav-inc/datetime v0.1.3
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v1.0.0 github.com/orcaman/concurrent-map v1.0.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607 github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0 github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
@@ -135,6 +135,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect
) )
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@@ -554,8 +554,6 @@ github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HD
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=

View File

@@ -33,8 +33,8 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/op/go-logging" "github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap" "github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -5,7 +5,7 @@ import (
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -24,8 +24,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/resolver" "github.com/up9inc/mizu/agent/pkg/resolver"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
@@ -128,11 +128,11 @@ BasenineReconnect:
for item := range outputItems { for item := range outputItems {
extension := extensionsMap[item.Protocol.Name] extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo) resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE { if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE {
namespace = item.Namespace namespace = item.Namespace
} }
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace) mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" { if extension.Protocol.Name == "http" {
if !disableOASValidation { if !disableOASValidation {

View File

@@ -6,8 +6,8 @@ import (
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/dependency" "github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -10,7 +10,7 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -14,8 +14,8 @@ import (
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
type BrowserClient struct { type BrowserClient struct {

View File

@@ -4,8 +4,8 @@ import (
"encoding/json" "encoding/json"
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods" "github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
func BroadcastTappedPodsStatus() { func BroadcastTappedPodsStatus() {

View File

@@ -10,7 +10,7 @@ import (
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api" "github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp" amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http" httpExt "github.com/up9inc/mizu/tap/extensions/http"

View File

@@ -10,7 +10,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func HandleEntriesError(c *gin.Context, err error) bool { func HandleEntriesError(c *gin.Context, err error) bool {

View File

@@ -7,7 +7,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency" "github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas" "github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func GetOASServers(c *gin.Context) { func GetOASServers(c *gin.Context) {

View File

@@ -13,9 +13,9 @@ import (
"github.com/up9inc/mizu/agent/pkg/providers/tappers" "github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/up9" "github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/validation" "github.com/up9inc/mizu/agent/pkg/validation"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
) )
func HealthCheck(c *gin.Context) { func HealthCheck(c *gin.Context) {

View File

@@ -9,8 +9,8 @@ import (
"time" "time"
"github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -9,8 +9,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/app" "github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -2,9 +2,10 @@ package har
import ( import (
"encoding/base64" "encoding/base64"
"github.com/up9inc/mizu/shared/logger"
"time" "time"
"unicode/utf8" "unicode/utf8"
"github.com/up9inc/mizu/logger"
) )
/* /*

View File

@@ -8,7 +8,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
// Keep it because we might want cookies in the future // Keep it because we might want cookies in the future

View File

@@ -16,7 +16,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func getFiles(baseDir string) (result []string, err error) { func getFiles(baseDir string) (result []string, err error) {

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var ( var (

View File

@@ -17,7 +17,7 @@ import (
"github.com/chanced/openapi" "github.com/chanced/openapi"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/nav-inc/datetime" "github.com/nav-inc/datetime"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"

View File

@@ -13,7 +13,7 @@ import (
"time" "time"
"github.com/chanced/openapi" "github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/wI2L/jsondiff" "github.com/wI2L/jsondiff"
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"

View File

@@ -8,7 +8,7 @@ import (
"strings" "strings"
"github.com/chanced/openapi" "github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
type NodePath = []string type NodePath = []string

View File

@@ -9,7 +9,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"github.com/chanced/openapi" "github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func exampleResolver(ref string) (*openapi.ExampleObj, error) { func exampleResolver(ref string) (*openapi.ExampleObj, error) {

View File

@@ -7,8 +7,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/providers/tappers" "github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
const FilePath = shared.DataDirPath + "tapped-pods.json" const FilePath = shared.DataDirPath + "tapped-pods.json"

View File

@@ -5,8 +5,8 @@ import (
"sync" "sync"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
const FilePath = shared.DataDirPath + "tappers-status.json" const FilePath = shared.DataDirPath + "tappers-status.json"

View File

@@ -5,7 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
cmap "github.com/orcaman/concurrent-map" cmap "github.com/orcaman/concurrent-map"

View File

@@ -10,7 +10,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/yalp/jsonpath" "github.com/yalp/jsonpath"

View File

@@ -1,10 +1,11 @@
package servicemap package servicemap
import ( import (
"github.com/jinzhu/copier"
"sync" "sync"
"github.com/up9inc/mizu/shared/logger" "github.com/jinzhu/copier"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -17,8 +17,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -13,8 +13,8 @@ import (
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
var ( var (

View File

@@ -4,15 +4,16 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
) )

View File

@@ -12,7 +12,7 @@ import (
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"golang.org/x/oauth2" "golang.org/x/oauth2"
) )

View File

@@ -5,7 +5,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry" "github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var checkCmd = &cobra.Command{ var checkCmd = &cobra.Command{

View File

@@ -3,13 +3,14 @@ package check
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp" "regexp"
"time" "time"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool { func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -2,14 +2,14 @@ package check
import ( import (
"fmt" "fmt"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver" "github.com/up9inc/mizu/shared/semver"
) )
func KubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) { func KubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------") logger.Log.Infof("\nkubernetes-api\n--------------------")

View File

@@ -4,15 +4,16 @@ import (
"context" "context"
"embed" "embed"
"fmt" "fmt"
"strings"
"github.com/up9inc/mizu/cli/bucket" "github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
rbac "k8s.io/api/rbac/v1" rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"strings"
) )
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool { func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -3,10 +3,11 @@ package check
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
) )
func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool { func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -2,9 +2,10 @@ package check
import ( import (
"fmt" "fmt"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver" "github.com/up9inc/mizu/shared/semver"
) )

View File

@@ -3,12 +3,13 @@ package check
import ( import (
"context" "context"
"fmt" "fmt"
"regexp"
"github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"regexp"
) )
func ServerConnection(kubernetesProvider *kubernetes.Provider) bool { func ServerConnection(kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -4,10 +4,11 @@ import (
"context" "context"
"embed" "embed"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/cmd/check" "github.com/up9inc/mizu/cli/cmd/check"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var ( var (

View File

@@ -19,8 +19,8 @@ import (
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
) )
func GetApiServerUrl(port uint16) string { func GetApiServerUrl(port uint16) string {

View File

@@ -9,7 +9,7 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry" "github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var configCmd = &cobra.Command{ var configCmd = &cobra.Command{

View File

@@ -4,7 +4,7 @@ import (
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func HandleExcWrapper(fn interface{}, params ...interface{}) (result []reflect.Value) { func HandleExcWrapper(fn interface{}, params ...interface{}) (result []reflect.Value) {

View File

@@ -2,9 +2,10 @@ package cmd
import ( import (
"fmt" "fmt"
"github.com/up9inc/mizu/cli/bucket" "github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func runMizuInstall() { func runMizuInstall() {

View File

@@ -10,7 +10,7 @@ import (
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu/fsUtils" "github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry" "github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var logsCmd = &cobra.Command{ var logsCmd = &cobra.Command{

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/cli/mizu/fsUtils" "github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version" "github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{

View File

@@ -14,8 +14,8 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
) )
const uploadTrafficMessageToConfirm = `NOTE: running mizu with --%s flag will upload recorded traffic for further analysis and enriched presentation options.` const uploadTrafficMessageToConfirm = `NOTE: running mizu with --%s flag will upload recorded traffic for further analysis and enriched presentation options.`

View File

@@ -25,9 +25,9 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -7,7 +7,7 @@ import (
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry" "github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/creasty/defaults" "github.com/creasty/defaults"
"github.com/spf13/cobra" "github.com/spf13/cobra"

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs" "github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry" "github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var viewCmd = &cobra.Command{ var viewCmd = &cobra.Command{

View File

@@ -11,8 +11,8 @@ import (
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils" "github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
) )
func runMizuView() { func runMizuView() {

View File

@@ -9,8 +9,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/creasty/defaults" "github.com/creasty/defaults"
"github.com/spf13/cobra" "github.com/spf13/cobra"

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/units" "github.com/up9inc/mizu/shared/units"
) )

View File

@@ -11,6 +11,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.3.0 github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
@@ -98,6 +99,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect
) )
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -10,8 +10,8 @@ import (
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
) )
func GetLogFilePath() string { func GetLogFilePath() string {

View File

@@ -8,7 +8,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func AddFileToZip(zipWriter *zip.Writer, filename string) error { func AddFileToZip(zipWriter *zip.Writer, filename string) error {

View File

@@ -13,7 +13,7 @@ import (
"github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/pkg/version" "github.com/up9inc/mizu/cli/pkg/version"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/google/go-github/v37/github" "github.com/google/go-github/v37/github"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"

View File

@@ -3,12 +3,13 @@ package resources
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu/fsUtils" "github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/cli/utils" "github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
) )
@@ -54,7 +55,6 @@ func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, ku
} }
} }
if resources, err := kubernetesProvider.ListManagedClusterRoleBindings(ctx); err != nil { if resources, err := kubernetesProvider.ListManagedClusterRoleBindings(ctx); err != nil {
resourceDesc := "ClusterRoleBindings" resourceDesc := "ClusterRoleBindings"
handleDeletionError(err, resourceDesc, &leftoverResources) handleDeletionError(err, resourceDesc, &leftoverResources)

View File

@@ -8,9 +8,9 @@ import (
"github.com/up9inc/mizu/cli/errormessage" "github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
) )

View File

@@ -4,14 +4,15 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"os"
"time"
"github.com/denisbrodbeck/machineid" "github.com/denisbrodbeck/machineid"
"github.com/up9inc/mizu/cli/apiserver" "github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu" "github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"net/http"
"os"
"time"
) )
const telemetryUrl = "https://us-east4-up9-prod.cloudfunctions.net/mizu-telemetry" const telemetryUrl = "https://us-east4-up9-prod.cloudfunctions.net/mizu-telemetry"

View File

@@ -6,7 +6,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func AskForConfirmation(s string) bool { func AskForConfirmation(s string) bool {

View File

@@ -5,7 +5,7 @@ import (
"os/exec" "os/exec"
"runtime" "runtime"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
func OpenBrowser(url string) { func OpenBrowser(url string) {

View File

@@ -2,10 +2,11 @@ package utils
import ( import (
"context" "context"
"github.com/up9inc/mizu/shared/logger"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/up9inc/mizu/logger"
) )
func WaitForFinish(ctx context.Context, cancel context.CancelFunc) { func WaitForFinish(ctx context.Context, cancel context.CancelFunc) {

12
devops/ui-common-pack.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
# exit when any command fails
set -e
dst_folder=$1
echo "dst folder: $dst_folder";
cd $dst_folder/../ui-common
npm i
npm pack
mv up9-mizu-common-0.0.0.tgz $dst_folder

5
logger/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module github.com/up9inc/mizu/logger
go 1.17
require github.com/op/go-logging v0.0.0-20160315200505-970db520ece7

2
logger/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=

View File

@@ -10,7 +10,6 @@ const (
ValidationRulesFileName = "validation-rules.yaml" ValidationRulesFileName = "validation-rules.yaml"
ContractFileName = "contract-oas.yaml" ContractFileName = "contract-oas.yaml"
ConfigFileName = "mizu-config.json" ConfigFileName = "mizu-config.json"
GoGCEnvVar = "GOGC"
DefaultApiServerPort = 8899 DefaultApiServerPort = 8899
LogLevelEnvVar = "LOG_LEVEL" LogLevelEnvVar = "LOG_LEVEL"
MizuAgentImageRepo = "docker.io/up9inc/mizu" MizuAgentImageRepo = "docker.io/up9inc/mizu"

View File

@@ -6,6 +6,7 @@ require (
github.com/docker/go-units v0.4.0 github.com/docker/go-units v0.4.0
github.com/golang-jwt/jwt/v4 v4.2.0 github.com/golang-jwt/jwt/v4 v4.2.0
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.23.3 k8s.io/api v0.23.3
@@ -91,4 +92,6 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect
) )
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -7,9 +7,9 @@ import (
"time" "time"
"github.com/op/go-logging" "github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
) )

View File

@@ -12,8 +12,8 @@ import (
"regexp" "regexp"
"github.com/op/go-logging" "github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver" "github.com/up9inc/mizu/shared/semver"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
auth "k8s.io/api/authorization/v1" auth "k8s.io/api/authorization/v1"
@@ -768,7 +768,6 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv( agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.LogLevelEnvVar).WithValue(logLevel.String()), applyconfcore.EnvVar().WithName(shared.LogLevelEnvVar).WithValue(logLevel.String()),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(mizuApiFilteringOptionsJsonStr)), applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(mizuApiFilteringOptionsJsonStr)),
) )
agentContainer.WithEnv( agentContainer.WithEnv(

View File

@@ -16,7 +16,7 @@ import (
"k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy" "k8s.io/client-go/transport/spdy"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"k8s.io/kubectl/pkg/proxy" "k8s.io/kubectl/pkg/proxy"
) )

View File

@@ -7,8 +7,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/debounce" "github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
) )

View File

@@ -5,7 +5,7 @@ import (
"strings" "strings"
"github.com/op/go-logging" "github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"

View File

@@ -104,11 +104,7 @@ type OutputChannelItem struct {
Namespace string Namespace string
} }
type SuperTimer struct { type ProtoIdentifier struct {
CaptureTime time.Time
}
type SuperIdentifier struct {
Protocol *Protocol Protocol *Protocol
IsClosedOthers bool IsClosedOthers bool
} }
@@ -130,7 +126,7 @@ func (p *ReadProgress) Current() (n int) {
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error Dissect(b *bufio.Reader, reader TcpReader, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error)
@@ -406,3 +402,39 @@ func (r *HTTPResponseWrapper) MarshalJSON() ([]byte, error) {
Response: r.Response, Response: r.Response,
}) })
} }
type TcpReaderDataMsg interface {
GetBytes() []byte
GetTimestamp() time.Time
}
type TcpReader interface {
Read(p []byte) (int, error)
GetReqResMatcher() RequestResponseMatcher
GetIsClient() bool
GetReadProgress() *ReadProgress
GetParent() TcpStream
GetTcpID() *TcpID
GetCounterPair() *CounterPair
GetCaptureTime() time.Time
GetEmitter() Emitter
GetIsClosed() bool
GetExtension() *Extension
}
type TcpStream interface {
SetProtocol(protocol *Protocol)
GetOrigin() Capture
GetProtoIdentifier() *ProtoIdentifier
GetReqResMatchers() []RequestResponseMatcher
GetIsTapTarget() bool
GetIsClosed() bool
}
type TcpStreamMap interface {
Range(f func(key, value interface{}) bool)
Store(key, value interface{})
Delete(key interface{})
NextId() int64
CloseTimedoutTcpStreamChannels()
}

View File

@@ -3,3 +3,7 @@ module github.com/up9inc/mizu/tap/api
go 1.17 go 1.17
require github.com/google/martian v2.1.0+incompatible require github.com/google/martian v2.1.0+incompatible
replace github.com/up9inc/mizu/logger v0.0.0 => ../../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../../shared

View File

@@ -5,7 +5,7 @@ import (
"time" "time"
"github.com/google/gopacket/reassembly" "github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
@@ -22,7 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration connectionTimeout time.Duration
stats CleanerStats stats CleanerStats
statsMutex sync.Mutex statsMutex sync.Mutex
streamsMap *tcpStreamMap streamsMap api.TcpStreamMap
} }
func (cl *Cleaner) clean() { func (cl *Cleaner) clean() {
@@ -33,13 +33,15 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock() cl.assemblerMutex.Unlock()
cl.streamsMap.streams.Range(func(k, v interface{}) bool { cl.streamsMap.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher reqResMatchers := v.(api.TcpStream).GetReqResMatchers()
if reqResMatcher == nil { for _, reqResMatcher := range reqResMatchers {
return true if reqResMatcher == nil {
continue
}
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
} }
deleted := deleteOlderThan(reqResMatcher.GetMap(), startCleanTime.Add(-cl.connectionTimeout))
cl.stats.deleted += deleted
return true return true
}) })

View File

@@ -8,7 +8,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )

View File

@@ -5,7 +5,7 @@ import (
"sync" "sync"
"github.com/google/gopacket/examples/util" "github.com/google/gopacket/examples/util"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/logger"
) )
var TapErrors *errorsMap var TapErrors *errorsMap

View File

@@ -1,6 +1,6 @@
package diagnose package diagnose
import "github.com/up9inc/mizu/shared/logger" import "github.com/up9inc/mizu/logger"
type tapperInternalStats struct { type tapperInternalStats struct {
Ipdefrag int Ipdefrag int

View File

@@ -15,3 +15,5 @@ require (
) )
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -39,17 +39,17 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request" const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
var header *HeaderFrame var header *HeaderFrame
connectionInfo := &api.ConnectionInfo{ connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: reader.GetTcpID().SrcIP,
ClientPort: tcpID.SrcPort, ClientPort: reader.GetTcpID().SrcPort,
ServerIP: tcpID.DstIP, ServerIP: reader.GetTcpID().DstIP,
ServerPort: tcpID.DstPort, ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true, IsOutgoing: true,
} }
@@ -75,7 +75,7 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
var lastMethodFrameMessage Message var lastMethodFrameMessage Message
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol { if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
@@ -112,12 +112,12 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
switch lastMethodFrameMessage.(type) { switch lastMethodFrameMessage.(type) {
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
} }
case *MethodFrame: case *MethodFrame:
@@ -137,8 +137,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicConsume: case *BasicConsume:
eventBasicConsume := &BasicConsume{ eventBasicConsume := &BasicConsume{
@@ -150,8 +150,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -170,8 +170,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ExchangeDeclare: case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{ eventExchangeDeclare := &ExchangeDeclare{
@@ -184,8 +184,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait, NoWait: m.NoWait,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionStart: case *ConnectionStart:
eventConnectionStart := &ConnectionStart{ eventConnectionStart := &ConnectionStart{
@@ -195,8 +195,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
Mechanisms: m.Mechanisms, Mechanisms: m.Mechanisms,
Locales: m.Locales, Locales: m.Locales,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
@@ -205,8 +205,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
ClassId: m.ClassId, ClassId: m.ClassId,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
superIdentifier.Protocol = &protocol reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
} }
default: default:

View File

@@ -106,7 +106,6 @@ func TestDissect(t *testing.T) {
Request: 0, Request: 0,
Response: 0, Response: 0,
} }
superIdentifier := &api.SuperIdentifier{}
// Request // Request
pathClient := _path pathClient := _path
@@ -122,7 +121,21 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -140,7 +153,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }

View File

@@ -0,0 +1,84 @@
package amqp
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -0,0 +1,45 @@
package amqp
import (
"sync"
"github.com/up9inc/mizu/tap/api"
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
}
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}
func (t *tcpStream) GetIsTapTarget() bool {
return t.isTapTarget
}
func (t *tcpStream) GetIsClosed() bool {
return t.isClosed
}

View File

@@ -18,3 +18,5 @@ require (
) )
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -8,6 +8,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"time"
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
@@ -47,7 +48,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = "" item.ConnectionInfo.ClientPort = ""
} }
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil { if err != nil {
return err return err
@@ -66,7 +67,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
streamID, streamID,
"HTTP2", "HTTP2",
) )
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor) item = reqResMatcher.registerRequest(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -86,7 +87,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
streamID, streamID,
"HTTP2", "HTTP2",
) )
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor) item = reqResMatcher.registerResponse(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,
@@ -111,7 +112,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
return nil return nil
} }
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b) req, err = http.ReadRequest(b)
if err != nil { if err != nil {
return return
@@ -139,7 +140,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
requestCounter, requestCounter,
"HTTP1", "HTTP1",
) )
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor) item := reqResMatcher.registerRequest(ident, req, captureTime, progress.Current(), req.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -154,7 +155,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
return return
} }
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response var res *http.Response
res, err = http.ReadResponse(b, nil) res, err = http.ReadResponse(b, nil)
if err != nil { if err != nil {
@@ -183,7 +184,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, captur
responseCounter, responseCounter,
"HTTP1", "HTTP1",
) )
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor) item := reqResMatcher.registerResponse(ident, res, captureTime, progress.Current(), res.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,

View File

@@ -86,15 +86,15 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name) log.Printf("pong %s", http11protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
var err error var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient) isHTTP2, _ := checkIsHTTP2Connection(b, reader.GetIsClient())
var http2Assembler *Http2Assembler var http2Assembler *Http2Assembler
if isHTTP2 { if isHTTP2 {
err = prepareHTTP2Connection(b, isClient) err = prepareHTTP2Connection(b, reader.GetIsClient())
if err != nil { if err != nil {
return err return err
} }
@@ -105,74 +105,74 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
for { for {
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient) isHTTP2, err = checkIsHTTP2Connection(b, reader.GetIsClient())
if err != nil { if err != nil {
break break
} }
err = prepareHTTP2Connection(b, isClient) err = prepareHTTP2Connection(b, reader.GetIsClient())
if err != nil { if err != nil {
break break
} }
http2Assembler = createHTTP2Assembler(b) http2Assembler = createHTTP2Assembler(b)
} }
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &http11protocol { if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
if isHTTP2 { if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher) err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.GetParent().SetProtocol(&http11protocol)
} else if isClient { } else if reader.GetIsClient() {
var req *http.Request var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.GetParent().SetProtocol(&http11protocol)
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1 // In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 { if switchingProtocolsHTTP2 {
ident := fmt.Sprintf( ident := fmt.Sprintf(
"%s_%s_%s_%s_1_%s", "%s_%s_%s_%s_1_%s",
tcpID.SrcIP, reader.GetTcpID().SrcIP,
tcpID.DstIP, reader.GetTcpID().DstIP,
tcpID.SrcPort, reader.GetTcpID().SrcPort,
tcpID.DstPort, reader.GetTcpID().DstPort,
"HTTP2", "HTTP2",
) )
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor) item := reqResMatcher.registerRequest(ident, req, reader.GetCaptureTime(), reader.GetReadProgress().Current(), req.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: reader.GetTcpID().SrcIP,
ClientPort: tcpID.SrcPort, ClientPort: reader.GetTcpID().SrcPort,
ServerIP: tcpID.DstIP, ServerIP: reader.GetTcpID().DstIP,
ServerPort: tcpID.DstPort, ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true, IsOutgoing: true,
} }
item.Capture = capture item.Capture = reader.GetParent().GetOrigin()
filterAndEmit(item, emitter, options) filterAndEmit(item, reader.GetEmitter(), options)
} }
} }
} else { } else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
continue continue
} }
superIdentifier.Protocol = &http11protocol reader.GetParent().SetProtocol(&http11protocol)
} }
} }
if superIdentifier.Protocol == nil { if reader.GetParent().GetProtoIdentifier().Protocol == nil {
return err return err
} }

View File

@@ -108,7 +108,6 @@ func TestDissect(t *testing.T) {
Request: 0, Request: 0,
Response: 0, Response: 0,
} }
superIdentifier := &api.SuperIdentifier{}
// Request // Request
pathClient := _path pathClient := _path
@@ -124,7 +123,21 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -142,7 +155,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }

View File

@@ -0,0 +1,84 @@
package http
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -0,0 +1,45 @@
package http
import (
"sync"
"github.com/up9inc/mizu/tap/api"
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatchers []api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
}
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatchers() []api.RequestResponseMatcher {
return t.reqResMatchers
}
func (t *tcpStream) GetIsTapTarget() bool {
return t.isTapTarget
}
func (t *tcpStream) GetIsClosed() bool {
return t.isClosed
}

View File

@@ -22,3 +22,5 @@ require (
) )
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -35,25 +35,25 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name) log.Printf("pong %s", _protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
return errors.New("Identified by another protocol") return errors.New("Identified by another protocol")
} }
if isClient { if reader.GetIsClient() {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher) _, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
superIdentifier.Protocol = &_protocol reader.GetParent().SetProtocol(&_protocol)
} else { } else {
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher) err := ReadResponse(b, reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), reqResMatcher)
if err != nil { if err != nil {
return err return err
} }
superIdentifier.Protocol = &_protocol reader.GetParent().SetProtocol(&_protocol)
} }
} }
} }

View File

@@ -106,7 +106,6 @@ func TestDissect(t *testing.T) {
Request: 0, Request: 0,
Response: 0, Response: 0,
} }
superIdentifier := &api.SuperIdentifier{}
// Request // Request
pathClient := _path pathClient := _path
@@ -123,7 +122,21 @@ func TestDissect(t *testing.T) {
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10) reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@@ -141,7 +154,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"` CaptureTime time.Time `json:"captureTime"`
} }
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) { func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@@ -206,7 +206,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
ApiVersion: apiVersion, ApiVersion: apiVersion,
CorrelationID: correlationID, CorrelationID: correlationID,
ClientID: clientID, ClientID: clientID,
CaptureTime: superTimer.CaptureTime, CaptureTime: captureTime,
Payload: payload, Payload: payload,
} }

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"` CaptureTime time.Time `json:"captureTime"`
} }
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) { func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4} d := &decoder{reader: r, remain: 4}
size := d.readInt32() size := d.readInt32()
@@ -43,7 +43,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Size: size, Size: size,
CorrelationID: correlationID, CorrelationID: correlationID,
Payload: payload, Payload: payload,
CaptureTime: superTimer.CaptureTime, CaptureTime: captureTime,
} }
key := fmt.Sprintf( key := fmt.Sprintf(

View File

@@ -0,0 +1,84 @@
package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

Some files were not shown because too many files have changed in this diff Show More