Compare commits

...

9 Commits

Author SHA1 Message Date
M. Mert Yıldıran
d3e6a69d82 Refactor tap module to achieve synchronously closing other protocol dissectors upon identification (#1026)
* Remove `tcpStreamWrapper` struct

* Refactor `tap` module and move some of the code to `tap/api` module

* Move `TrafficFilteringOptions` struct to `shared` module

* Change the `Dissect` method signature to have `*TcpReader` as an argument

* Add `CloseOtherProtocolDissectors` method and use it to synchronously close the other protocol dissectors

* Run `go mod tidy` in `cli` module

* Rename `SuperIdentifier` struct to `ProtoIdentifier`

* Remove `SuperTimer` struct

* Bring back `CloseTimedoutTcpStreamChannels` method

* Run `go mod tidy` everywhere

* Remove `GOGC` environment variable from tapper

* Fix the tests

* Bring back `debug.FreeOSMemory()` call

* Make `CloseOtherProtocolDissectors` method mutexed

* Revert "Remove `GOGC` environment variable from tapper"

This reverts commit cfc2484bbb.

* Bring back the removed `checksum`, `nooptcheck` and `ignorefsmerr` flags

* Define a bunch of interfaces and don't export any new structs from `tap/api`

* Keep the interfaces in `tap/api` but move the structs to `tap/tcp`

* Fix the unit tests by depending on `github.com/up9inc/mizu/tap`

* Use the modified `tlsEmitter`

* Define `TlsChunk` interface and make `tlsReader` implement `TcpReader`

* Remove unused fields in `tlsReader`

* Define `ReassemblyStream` interface and separate `gopacket` specififc fields to `tcpReassemblyStream` struct

Such that make `tap/api` don't depend on `gopacket`

* Remove the unused fields

* Make `tlsPoller` implement `TcpStream` interface and remove the call to `NewTcpStreamDummy` method

* Remove unused fields from `tlsPoller`

* Remove almost all of the setter methods in `TcpReader` and `TcpStream` interface and remove `TlsChunk` interface

* Revert "Revert "Remove `GOGC` environment variable from tapper""

This reverts commit ab2b9a803b.

* Revert "Bring back `debug.FreeOSMemory()` call"

This reverts commit 1cce863bbb.

* Remove excess comment

* Fix acceptance tests (`logger` module) #run_acceptance_tests

* Bring back `github.com/patrickmn/go-cache`

* Fix `NewTcpStream` method signature

* Put `tcpReader` and `tcpStream` mocks into protocol dissectors to remove `github.com/up9inc/mizu/tap` dependency

* Fix AMQP tests

* Revert 960ba644cd

* Revert `go.mod` and `go.sum` files in protocol dissectors

* Fix the comment position

* Revert `AppStatsInst` change

* Fix indent

* Fix CLI build

* Fix linter error

* Fix error msg

* Revert some of the changes in `chunk.go`
2022-04-28 17:19:14 +03:00
AmitUp9
ed9e162af0 validation that grpc error render only when needed (#1051) 2022-04-28 15:18:08 +03:00
Igor Gov
4e22e77597 Fix: remove agent unused go dependency (#1050) 2022-04-28 12:08:59 +03:00
AmitUp9
3978ace4ef searchable dropdown added to oas modal (#1044)
* searchable dropdown added to oas modal

* remove unnecessary attribute from autocomplete

* move css to sass file
2022-04-28 11:59:13 +03:00
Igor Gov
e71a12d399 Introducing eslint (#1048)
* Introducing eslint
2022-04-28 11:46:00 +03:00
M. Mert Yıldıran
90c54f9505 Fix acceptance tests (logger module) (#1049) 2022-04-27 23:59:16 +03:00
M. Mert Yıldıran
e1ad302c29 Make logger a separate module such that don't depend on shared module as a whole for logging (#1047)
* Make `logger` a separate module such that don't depend on `shared` module as a whole for logging

* Update `Dockerfile`
2022-04-27 22:26:27 +03:00
lirazyehezkel
ee8dce4466 Clear entry detailed on workspace change (#1045) 2022-04-26 15:40:42 +03:00
David Levanon
b5c665b602 set capture time for every packet, so long living readers would be accurate (#1043) 2022-04-26 14:37:44 +03:00
144 changed files with 1788 additions and 1536 deletions

View File

@@ -141,3 +141,42 @@ jobs:
with:
version: latest
working-directory: tap/extensions/redis
- uses: actions/setup-node@v2
with:
node-version: 16
- name: Check modified UI files
id: ui_modified_files
run: devops/check_modified_files.sh ui/
- name: ESLint prerequisites ui
if: steps.ui_modified_files.outputs.matched == 'true'
run: |
sudo npm install -g eslint
cd ui
npm run prestart
npm i
- name: ESLint ui
if: steps.ui_modified_files.outputs.matched == 'true'
run: |
cd ui
npm run eslint
- name: Check modified ui-common files
id: ui_common_modified_files
run: devops/check_modified_files.sh ui-common/
- name: ESLint prerequisites ui-common
if: steps.ui_common_modified_files.outputs.matched == 'true'
run: |
sudo npm install -g eslint
cd ui-common
npm i
- name: ESLint ui-common
if: steps.ui_common_modified_files.outputs.matched == 'true'
run: |
cd ui-common
npm run eslint

View File

@@ -58,6 +58,7 @@ WORKDIR /app/agent-build
COPY agent/go.mod agent/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
COPY logger/go.mod logger/go.mod ../logger/
COPY tap/go.mod tap/go.mod ../tap/
COPY tap/api/go.mod ../tap/api/
COPY tap/extensions/amqp/go.mod ../tap/extensions/amqp/
@@ -66,10 +67,12 @@ COPY tap/extensions/kafka/go.mod ../tap/extensions/kafka/
COPY tap/extensions/redis/go.mod ../tap/extensions/redis/
RUN go mod download
# cheap trick to make the build faster (as long as go.mod did not change)
RUN go get github.com/patrickmn/go-cache
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' | xargs go get
# Copy and build agent code
COPY shared ../shared
COPY logger ../logger
COPY tap ../tap
COPY agent .

View File

@@ -29,6 +29,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/up9inc/mizu/logger v0.0.0 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220207234003-57398862261d // indirect
@@ -48,6 +49,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -19,9 +19,9 @@ require (
github.com/nav-inc/datetime v0.1.3
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v1.0.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.0
github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
@@ -135,6 +135,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@@ -554,8 +554,6 @@ github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HD
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=

View File

@@ -33,8 +33,8 @@ import (
"github.com/gorilla/websocket"
"github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -5,7 +5,7 @@ import (
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -24,8 +24,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/resolver"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
basenine "github.com/up9inc/basenine/client/go"
@@ -128,11 +128,11 @@ BasenineReconnect:
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE {
namespace = item.Namespace
}
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {

View File

@@ -6,8 +6,8 @@ import (
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -10,7 +10,7 @@ import (
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -14,8 +14,8 @@ import (
tapApi "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
type BrowserClient struct {

View File

@@ -4,8 +4,8 @@ import (
"encoding/json"
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
func BroadcastTappedPodsStatus() {

View File

@@ -10,7 +10,7 @@ import (
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"

View File

@@ -10,7 +10,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func HandleEntriesError(c *gin.Context, err error) bool {

View File

@@ -7,7 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func GetOASServers(c *gin.Context) {

View File

@@ -13,9 +13,9 @@ import (
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/validation"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func HealthCheck(c *gin.Context) {

View File

@@ -9,8 +9,8 @@ import (
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)

View File

@@ -9,8 +9,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -2,9 +2,10 @@ package har
import (
"encoding/base64"
"github.com/up9inc/mizu/shared/logger"
"time"
"unicode/utf8"
"github.com/up9inc/mizu/logger"
)
/*

View File

@@ -8,7 +8,7 @@ import (
"strings"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
// Keep it because we might want cookies in the future

View File

@@ -16,7 +16,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func getFiles(baseDir string) (result []string, err error) {

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var (

View File

@@ -17,7 +17,7 @@ import (
"github.com/chanced/openapi"
"github.com/google/uuid"
"github.com/nav-inc/datetime"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/agent/pkg/har"

View File

@@ -13,7 +13,7 @@ import (
"time"
"github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/wI2L/jsondiff"
basenine "github.com/up9inc/basenine/client/go"

View File

@@ -8,7 +8,7 @@ import (
"strings"
"github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
type NodePath = []string

View File

@@ -9,7 +9,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func exampleResolver(ref string) (*openapi.ExampleObj, error) {

View File

@@ -7,8 +7,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const FilePath = shared.DataDirPath + "tapped-pods.json"

View File

@@ -5,8 +5,8 @@ import (
"sync"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const FilePath = shared.DataDirPath + "tappers-status.json"

View File

@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
cmap "github.com/orcaman/concurrent-map"

View File

@@ -10,7 +10,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/yalp/jsonpath"

View File

@@ -1,10 +1,11 @@
package servicemap
import (
"github.com/jinzhu/copier"
"sync"
"github.com/up9inc/mizu/shared/logger"
"github.com/jinzhu/copier"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -17,8 +17,8 @@ import (
"github.com/up9inc/mizu/agent/pkg/utils"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)

View File

@@ -13,8 +13,8 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
var (

View File

@@ -4,15 +4,16 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
)

View File

@@ -12,7 +12,7 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"golang.org/x/oauth2"
)

View File

@@ -5,7 +5,7 @@ import (
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var checkCmd = &cobra.Command{

View File

@@ -3,13 +3,14 @@ package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"time"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -2,14 +2,14 @@ package check
import (
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
func KubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------")

View File

@@ -4,15 +4,16 @@ import (
"context"
"embed"
"fmt"
"strings"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"strings"
)
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -3,10 +3,11 @@ package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -2,9 +2,10 @@ package check
import (
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)

View File

@@ -3,12 +3,13 @@ package check
import (
"context"
"fmt"
"regexp"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"regexp"
)
func ServerConnection(kubernetesProvider *kubernetes.Provider) bool {

View File

@@ -4,10 +4,11 @@ import (
"context"
"embed"
"fmt"
"github.com/up9inc/mizu/cli/cmd/check"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var (

View File

@@ -19,8 +19,8 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func GetApiServerUrl(port uint16) string {

View File

@@ -9,7 +9,7 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var configCmd = &cobra.Command{

View File

@@ -4,7 +4,7 @@ import (
"reflect"
"runtime/debug"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func HandleExcWrapper(fn interface{}, params ...interface{}) (result []reflect.Value) {

View File

@@ -2,9 +2,10 @@ package cmd
import (
"fmt"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func runMizuInstall() {

View File

@@ -10,7 +10,7 @@ import (
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var logsCmd = &cobra.Command{

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var rootCmd = &cobra.Command{

View File

@@ -14,8 +14,8 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const uploadTrafficMessageToConfirm = `NOTE: running mizu with --%s flag will upload recorded traffic for further analysis and enriched presentation options.`

View File

@@ -25,9 +25,9 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)

View File

@@ -7,7 +7,7 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/creasty/defaults"
"github.com/spf13/cobra"

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var viewCmd = &cobra.Command{

View File

@@ -11,8 +11,8 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func runMizuView() {

View File

@@ -9,8 +9,8 @@ import (
"strconv"
"strings"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/creasty/defaults"
"github.com/spf13/cobra"

View File

@@ -11,7 +11,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/units"
)

View File

@@ -11,6 +11,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
@@ -98,6 +99,8 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -10,8 +10,8 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func GetLogFilePath() string {

View File

@@ -8,7 +8,7 @@ import (
"path/filepath"
"strings"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func AddFileToZip(zipWriter *zip.Writer, filename string) error {

View File

@@ -13,7 +13,7 @@ import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/pkg/version"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/google/go-github/v37/github"
"github.com/up9inc/mizu/cli/uiUtils"

View File

@@ -3,12 +3,13 @@ package resources
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/cli/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/apimachinery/pkg/util/wait"
)
@@ -54,7 +55,6 @@ func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, ku
}
}
if resources, err := kubernetesProvider.ListManagedClusterRoleBindings(ctx); err != nil {
resourceDesc := "ClusterRoleBindings"
handleDeletionError(err, resourceDesc, &leftoverResources)

View File

@@ -8,9 +8,9 @@ import (
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
)

View File

@@ -4,14 +4,15 @@ import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/denisbrodbeck/machineid"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared/logger"
"net/http"
"os"
"time"
"github.com/up9inc/mizu/logger"
)
const telemetryUrl = "https://us-east4-up9-prod.cloudfunctions.net/mizu-telemetry"

View File

@@ -6,7 +6,7 @@ import (
"os"
"strings"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func AskForConfirmation(s string) bool {

View File

@@ -5,7 +5,7 @@ import (
"os/exec"
"runtime"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
func OpenBrowser(url string) {

View File

@@ -2,10 +2,11 @@ package utils
import (
"context"
"github.com/up9inc/mizu/shared/logger"
"os"
"os/signal"
"syscall"
"github.com/up9inc/mizu/logger"
)
func WaitForFinish(ctx context.Context, cancel context.CancelFunc) {

12
devops/ui-common-pack.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
# exit when any command fails
set -e
dst_folder=$1
echo "dst folder: $dst_folder";
cd $dst_folder/../ui-common
npm i
npm pack
mv up9-mizu-common-0.0.0.tgz $dst_folder

5
logger/go.mod Normal file
View File

@@ -0,0 +1,5 @@
module github.com/up9inc/mizu/logger
go 1.17
require github.com/op/go-logging v0.0.0-20160315200505-970db520ece7

2
logger/go.sum Normal file
View File

@@ -0,0 +1,2 @@
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=

View File

@@ -10,7 +10,6 @@ const (
ValidationRulesFileName = "validation-rules.yaml"
ContractFileName = "contract-oas.yaml"
ConfigFileName = "mizu-config.json"
GoGCEnvVar = "GOGC"
DefaultApiServerPort = 8899
LogLevelEnvVar = "LOG_LEVEL"
MizuAgentImageRepo = "docker.io/up9inc/mizu"

View File

@@ -6,6 +6,7 @@ require (
github.com/docker/go-units v0.4.0
github.com/golang-jwt/jwt/v4 v4.2.0
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/up9inc/mizu/logger v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/api v0.23.3
@@ -91,4 +92,6 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/up9inc/mizu/logger v0.0.0 => ../logger
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../tap/api

View File

@@ -7,9 +7,9 @@ import (
"time"
"github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
core "k8s.io/api/core/v1"
)

View File

@@ -12,8 +12,8 @@ import (
"regexp"
"github.com/op/go-logging"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
"github.com/up9inc/mizu/tap/api"
auth "k8s.io/api/authorization/v1"
@@ -768,7 +768,6 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.LogLevelEnvVar).WithValue(logLevel.String()),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
applyconfcore.EnvVar().WithName(shared.MizuFilteringOptionsEnvVar).WithValue(string(mizuApiFilteringOptionsJsonStr)),
)
agentContainer.WithEnv(

View File

@@ -16,7 +16,7 @@ import (
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"k8s.io/kubectl/pkg/proxy"
)

View File

@@ -7,8 +7,8 @@ import (
"sync"
"time"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/apimachinery/pkg/watch"
)

View File

@@ -5,7 +5,7 @@ import (
"strings"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"

View File

@@ -104,11 +104,7 @@ type OutputChannelItem struct {
Namespace string
}
type SuperTimer struct {
CaptureTime time.Time
}
type SuperIdentifier struct {
type ProtoIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
@@ -130,7 +126,7 @@ func (p *ReadProgress) Current() (n int) {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Dissect(b *bufio.Reader, reader TcpReader, options *TrafficFilteringOptions) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error)
@@ -406,3 +402,39 @@ func (r *HTTPResponseWrapper) MarshalJSON() ([]byte, error) {
Response: r.Response,
})
}
type TcpReaderDataMsg interface {
GetBytes() []byte
GetTimestamp() time.Time
}
type TcpReader interface {
Read(p []byte) (int, error)
GetReqResMatcher() RequestResponseMatcher
GetIsClient() bool
GetReadProgress() *ReadProgress
GetParent() TcpStream
GetTcpID() *TcpID
GetCounterPair() *CounterPair
GetCaptureTime() time.Time
GetEmitter() Emitter
GetIsClosed() bool
GetExtension() *Extension
}
type TcpStream interface {
SetProtocol(protocol *Protocol)
GetOrigin() Capture
GetProtoIdentifier() *ProtoIdentifier
GetReqResMatcher() RequestResponseMatcher
GetIsTapTarget() bool
GetIsClosed() bool
}
type TcpStreamMap interface {
Range(f func(key, value interface{}) bool)
Store(key, value interface{})
Delete(key interface{})
NextId() int64
CloseTimedoutTcpStreamChannels()
}

View File

@@ -3,3 +3,7 @@ module github.com/up9inc/mizu/tap/api
go 1.17
require github.com/google/martian v2.1.0+incompatible
replace github.com/up9inc/mizu/logger v0.0.0 => ../../logger
replace github.com/up9inc/mizu/shared v0.0.0 => ../../shared

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -22,7 +22,7 @@ type Cleaner struct {
connectionTimeout time.Duration
stats CleanerStats
statsMutex sync.Mutex
streamsMap *tcpStreamMap
streamsMap api.TcpStreamMap
}
func (cl *Cleaner) clean() {
@@ -33,8 +33,8 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
cl.streamsMap.Range(func(k, v interface{}) bool {
reqResMatcher := v.(api.TcpStream).GetReqResMatcher()
if reqResMatcher == nil {
return true
}

View File

@@ -8,7 +8,7 @@ import (
"strconv"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/tap/api"
)

View File

@@ -5,7 +5,7 @@ import (
"sync"
"github.com/google/gopacket/examples/util"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/logger"
)
var TapErrors *errorsMap

View File

@@ -1,6 +1,6 @@
package diagnose
import "github.com/up9inc/mizu/shared/logger"
import "github.com/up9inc/mizu/logger"
type tapperInternalStats struct {
Ipdefrag int

View File

@@ -15,3 +15,5 @@ require (
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -39,17 +39,17 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
r := AmqpReader{b}
var remaining int
var header *HeaderFrame
connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
ClientIP: reader.GetTcpID().SrcIP,
ClientPort: reader.GetTcpID().SrcPort,
ServerIP: reader.GetTcpID().DstIP,
ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true,
}
@@ -75,7 +75,7 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
var lastMethodFrameMessage Message
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &protocol {
return errors.New("Identified by another protocol")
}
@@ -112,12 +112,12 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
}
case *MethodFrame:
@@ -137,8 +137,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -150,8 +150,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -170,8 +170,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -184,8 +184,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
NoWait: m.NoWait,
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -195,8 +195,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -205,8 +205,8 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
ClassId: m.ClassId,
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
reader.GetParent().SetProtocol(&protocol)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, reader.GetCaptureTime(), reader.GetReadProgress().Current(), reader.GetEmitter(), reader.GetParent().GetOrigin())
}
default:

View File

@@ -106,7 +106,6 @@ func TestDissect(t *testing.T) {
Request: 0,
Response: 0,
}
superIdentifier := &api.SuperIdentifier{}
// Request
pathClient := _path
@@ -122,7 +121,21 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -140,7 +153,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -0,0 +1,84 @@
package amqp
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -0,0 +1,45 @@
package amqp
import (
"sync"
"github.com/up9inc/mizu/tap/api"
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
}
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
return t.reqResMatcher
}
func (t *tcpStream) GetIsTapTarget() bool {
return t.isTapTarget
}
func (t *tcpStream) GetIsClosed() bool {
return t.isClosed
}

View File

@@ -18,3 +18,5 @@ require (
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -47,7 +48,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -66,7 +67,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
streamID,
"HTTP2",
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -86,7 +87,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
streamID,
"HTTP2",
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, captureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
@@ -111,7 +112,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgres
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -139,7 +140,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
requestCounter,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
item := reqResMatcher.registerRequest(ident, req, captureTime, progress.Current(), req.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -154,7 +155,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, captur
return
}
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -183,7 +184,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, captur
responseCounter,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor)
item := reqResMatcher.registerResponse(ident, res, captureTime, progress.Current(), res.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,

View File

@@ -86,15 +86,15 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
isHTTP2, _ := checkIsHTTP2Connection(b, reader.GetIsClient())
var http2Assembler *Http2Assembler
if isHTTP2 {
err = prepareHTTP2Connection(b, isClient)
err = prepareHTTP2Connection(b, reader.GetIsClient())
if err != nil {
return err
}
@@ -105,74 +105,74 @@ func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture
for {
if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient)
isHTTP2, err = checkIsHTTP2Connection(b, reader.GetIsClient())
if err != nil {
break
}
err = prepareHTTP2Connection(b, isClient)
err = prepareHTTP2Connection(b, reader.GetIsClient())
if err != nil {
break
}
http2Assembler = createHTTP2Assembler(b)
}
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &http11protocol {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &http11protocol {
return errors.New("Identified by another protocol")
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher)
err = handleHTTP2Stream(http2Assembler, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
continue
}
superIdentifier.Protocol = &http11protocol
} else if isClient {
reader.GetParent().SetProtocol(&http11protocol)
} else if reader.GetIsClient() {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
continue
}
superIdentifier.Protocol = &http11protocol
reader.GetParent().SetProtocol(&http11protocol)
// In case of an HTTP2 upgrade, duplicate the HTTP1 request into HTTP2 with stream ID 1
if switchingProtocolsHTTP2 {
ident := fmt.Sprintf(
"%s_%s_%s_%s_1_%s",
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
reader.GetTcpID().SrcIP,
reader.GetTcpID().DstIP,
reader.GetTcpID().SrcPort,
reader.GetTcpID().DstPort,
"HTTP2",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
item := reqResMatcher.registerRequest(ident, req, reader.GetCaptureTime(), reader.GetReadProgress().Current(), req.ProtoMinor)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
ServerIP: tcpID.DstIP,
ServerPort: tcpID.DstPort,
ClientIP: reader.GetTcpID().SrcIP,
ClientPort: reader.GetTcpID().SrcPort,
ServerIP: reader.GetTcpID().DstIP,
ServerPort: reader.GetTcpID().DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
item.Capture = reader.GetParent().GetOrigin()
filterAndEmit(item, reader.GetEmitter(), options)
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, reader.GetReadProgress(), reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
continue
}
superIdentifier.Protocol = &http11protocol
reader.GetParent().SetProtocol(&http11protocol)
}
}
if superIdentifier.Protocol == nil {
if reader.GetParent().GetProtoIdentifier().Protocol == nil {
return err
}

View File

@@ -108,7 +108,6 @@ func TestDissect(t *testing.T) {
Request: 0,
Response: 0,
}
superIdentifier := &api.SuperIdentifier{}
// Request
pathClient := _path
@@ -124,7 +123,21 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -142,7 +155,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -0,0 +1,84 @@
package http
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

View File

@@ -0,0 +1,45 @@
package http
import (
"sync"
"github.com/up9inc/mizu/tap/api"
)
type tcpStream struct {
isClosed bool
protoIdentifier *api.ProtoIdentifier
isTapTarget bool
origin api.Capture
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpStream(capture api.Capture) api.TcpStream {
return &tcpStream{
origin: capture,
protoIdentifier: &api.ProtoIdentifier{},
}
}
func (t *tcpStream) SetProtocol(protocol *api.Protocol) {}
func (t *tcpStream) GetOrigin() api.Capture {
return t.origin
}
func (t *tcpStream) GetProtoIdentifier() *api.ProtoIdentifier {
return t.protoIdentifier
}
func (t *tcpStream) GetReqResMatcher() api.RequestResponseMatcher {
return t.reqResMatcher
}
func (t *tcpStream) GetIsTapTarget() bool {
return t.isTapTarget
}
func (t *tcpStream) GetIsClosed() bool {
return t.isClosed
}

View File

@@ -22,3 +22,5 @@ require (
)
replace github.com/up9inc/mizu/tap/api v0.0.0 => ../../api
replace github.com/up9inc/mizu/logger v0.0.0 => ../../../logger

View File

@@ -35,25 +35,25 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
func (d dissecting) Dissect(b *bufio.Reader, reader api.TcpReader, options *api.TrafficFilteringOptions) error {
reqResMatcher := reader.GetReqResMatcher().(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
if reader.GetParent().GetProtoIdentifier().Protocol != nil && reader.GetParent().GetProtoIdentifier().Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer, reqResMatcher)
if reader.GetIsClient() {
_, _, err := ReadRequest(b, reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
reader.GetParent().SetProtocol(&_protocol)
} else {
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher)
err := ReadResponse(b, reader.GetParent().GetOrigin(), reader.GetTcpID(), reader.GetCounterPair(), reader.GetCaptureTime(), reader.GetEmitter(), reqResMatcher)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
reader.GetParent().SetProtocol(&_protocol)
}
}
}

View File

@@ -106,7 +106,6 @@ func TestDissect(t *testing.T) {
Request: 0,
Response: 0,
}
superIdentifier := &api.SuperIdentifier{}
// Request
pathClient := _path
@@ -123,7 +122,21 @@ func TestDissect(t *testing.T) {
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
stream := NewTcpStream(api.Pcap)
reader := NewTcpReader(
&api.ReadProgress{},
"",
tcpIDClient,
time.Time{},
stream,
true,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferClient, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +154,20 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
reader = NewTcpReader(
&api.ReadProgress{},
"",
tcpIDServer,
time.Time{},
stream,
false,
false,
nil,
emitter,
counterPair,
reqResMatcher,
)
err = dissector.Dissect(bufferServer, reader, options)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, reqResMatcher *requestResponseMatcher) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -206,7 +206,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, su
ApiVersion: apiVersion,
CorrelationID: correlationID,
ClientID: clientID,
CaptureTime: superTimer.CaptureTime,
CaptureTime: captureTime,
Payload: payload,
}

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, captureTime time.Time, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -43,7 +43,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Size: size,
CorrelationID: correlationID,
Payload: payload,
CaptureTime: superTimer.CaptureTime,
CaptureTime: captureTime,
}
key := fmt.Sprintf(

View File

@@ -0,0 +1,84 @@
package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}
func (reader *tcpReader) GetExtension() *api.Extension {
return reader.extension
}

Some files were not shown because too many files have changed in this diff Show More