mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-18 03:50:06 +00:00
Compare commits
14 Commits
31.0-dev45
...
31.0-dev59
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d834fcc3cb | ||
|
|
1624b0d7b9 | ||
|
|
9715bb046b | ||
|
|
65e5ebe23c | ||
|
|
30986c3b22 | ||
|
|
1e167f2757 | ||
|
|
149e86d050 | ||
|
|
1213162b85 | ||
|
|
189c158150 | ||
|
|
c5006e5f57 | ||
|
|
d7fcf273c0 | ||
|
|
eca3267b47 | ||
|
|
a527fc6c51 | ||
|
|
e104128df8 |
38
.github/workflows/acceptance_tests_on_pr.yml
vendored
Normal file
38
.github/workflows/acceptance_tests_on_pr.yml
vendored
Normal file
@@ -0,0 +1,38 @@
|
||||
name: Acceptance tests on PR
|
||||
|
||||
on: push
|
||||
|
||||
concurrency:
|
||||
group: acceptance-tests-on-pr-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
run-tests:
|
||||
name: Run tests
|
||||
runs-on: ubuntu-latest
|
||||
if: ${{ contains(github.event.head_commit.message, '#run_acceptance_tests') }}
|
||||
|
||||
steps:
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Setup acceptance test
|
||||
run: ./acceptanceTests/setup.sh
|
||||
|
||||
- name: Create k8s users and change context
|
||||
env:
|
||||
USERNAME_UNRESTRICTED: user-with-clusterwide-access
|
||||
USERNAME_RESTRICTED: user-with-restricted-access
|
||||
run: |
|
||||
./acceptanceTests/create_user.sh "${USERNAME_UNRESTRICTED}"
|
||||
./acceptanceTests/create_user.sh "${USERNAME_RESTRICTED}"
|
||||
kubectl apply -f cli/cmd/permissionFiles/permissions-all-namespaces-tap.yaml
|
||||
kubectl config use-context ${USERNAME_UNRESTRICTED}
|
||||
|
||||
- name: Test
|
||||
run: make acceptance-test
|
||||
3
.github/workflows/test.yml
vendored
3
.github/workflows/test.yml
vendored
@@ -56,7 +56,7 @@ jobs:
|
||||
|
||||
- name: Check extensions modified files
|
||||
id: ext_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/
|
||||
run: devops/check_modified_files.sh tap/extensions/ tap/api/
|
||||
|
||||
- name: Extensions Test
|
||||
if: github.event_name == 'push' || steps.ext_modified_files.outputs.matched == 'true'
|
||||
@@ -64,4 +64,3 @@ jobs:
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v2
|
||||
|
||||
|
||||
@@ -87,8 +87,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
|
||||
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.7.2/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.7.2/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.7.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.7.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
|
||||
RUN shasum -a 256 -c basenine_linux_"${GOARCH}".sha256 && \
|
||||
chmod +x ./basenine_linux_"${GOARCH}" && \
|
||||
|
||||
@@ -248,7 +248,7 @@ function deeperCheck(leftSidePath, rightSidePath, filterName, leftSideExpectedTe
|
||||
const entryId = getEntryId(element[0].id);
|
||||
leftOnHoverCheck(entryId, leftSidePath, filterName);
|
||||
|
||||
element.click();
|
||||
cy.get(`#list #entry-${entryId}`).click();
|
||||
rightTextCheck(rightSidePath, rightSideExpectedText);
|
||||
rightOnHoverCheck(rightSidePath, filterName);
|
||||
}
|
||||
|
||||
@@ -19,9 +19,9 @@ require (
|
||||
github.com/nav-inc/datetime v0.1.3
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/orcaman/concurrent-map v1.0.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220413173135-69508ca741d7
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
@@ -49,7 +49,6 @@ require (
|
||||
github.com/PuerkitoBio/purell v1.1.1 // indirect
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/beevik/etree v1.1.0 // indirect
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 // indirect
|
||||
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
|
||||
github.com/chanced/dynamic v0.0.0-20211210164248-f8fadb1d735b // indirect
|
||||
github.com/cilium/ebpf v0.8.0 // indirect
|
||||
|
||||
@@ -108,8 +108,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
|
||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
@@ -683,8 +681,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
|
||||
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220413173135-69508ca741d7 h1:9aciby1Byjn50gVXpOuvWSe48GdSK1uS2bcBKMZYHKI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220413173135-69508ca741d7/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607 h1:UqxUSkOYOmsLZWQtMSk02ttnhdRwBRLOLt2aDiS9tEk=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220419100955-e2ca51087607/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw=
|
||||
|
||||
@@ -103,12 +103,18 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
panic("Channel of captured messages is nil")
|
||||
}
|
||||
|
||||
BasenineReconnect:
|
||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Can't establish a new connection to Basenine server: %v", err)
|
||||
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||
goto BasenineReconnect
|
||||
}
|
||||
if err = connection.InsertMode(); err != nil {
|
||||
logger.Log.Panicf("Insert mode call failed: %v", err)
|
||||
logger.Log.Errorf("Insert mode call failed: %v", err)
|
||||
connection.Close()
|
||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||
goto BasenineReconnect
|
||||
}
|
||||
|
||||
disableOASValidation := false
|
||||
@@ -122,19 +128,24 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
for item := range outputItems {
|
||||
extension := extensionsMap[item.Protocol.Name]
|
||||
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
|
||||
|
||||
if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE {
|
||||
namespace = item.Namespace
|
||||
}
|
||||
|
||||
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
|
||||
if extension.Protocol.Name == "http" {
|
||||
if !disableOASValidation {
|
||||
var httpPair tapApi.HTTPRequestResponsePair
|
||||
if err := json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair); err != nil {
|
||||
logger.Log.Error(err)
|
||||
} else {
|
||||
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
|
||||
mizuEntry.ContractStatus = contract.Status
|
||||
mizuEntry.ContractRequestReason = contract.RequestReason
|
||||
mizuEntry.ContractResponseReason = contract.ResponseReason
|
||||
mizuEntry.ContractContent = contract.Content
|
||||
}
|
||||
|
||||
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
|
||||
mizuEntry.ContractStatus = contract.Status
|
||||
mizuEntry.ContractRequestReason = contract.RequestReason
|
||||
mizuEntry.ContractResponseReason = contract.ResponseReason
|
||||
mizuEntry.ContractContent = contract.Content
|
||||
}
|
||||
|
||||
harEntry, err := har.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
|
||||
@@ -146,13 +157,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
|
||||
data, err := json.Marshal(mizuEntry)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
logger.Log.Errorf("Error while marshaling entry: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
providers.EntryAdded(len(data))
|
||||
|
||||
if err = connection.SendText(string(data)); err != nil {
|
||||
logger.Log.Panicf("An error occured while inserting a new record to database: %v", err)
|
||||
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
|
||||
connection.Close()
|
||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||
goto BasenineReconnect
|
||||
}
|
||||
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||
|
||||
@@ -24,7 +24,7 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
||||
|
||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("failed to establish a connection to Basenine: %v", err)
|
||||
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
||||
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||
return err
|
||||
}
|
||||
@@ -80,7 +80,9 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
||||
go handleMetaChannel(connection, meta)
|
||||
|
||||
if err = connection.Query(query, data, meta); err != nil {
|
||||
logger.Log.Panicf("Query mode call failed: %v", err)
|
||||
logger.Log.Errorf("Query mode call failed: %v", err)
|
||||
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@@ -2,6 +2,7 @@ package controllers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
core "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -93,10 +94,6 @@ func GetGeneralStats(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.GetGeneralStats())
|
||||
}
|
||||
|
||||
func GetRecentTLSLinks(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
|
||||
}
|
||||
|
||||
func GetCurrentResolvingInformation(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, holder.GetResolver().GetMap())
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package entries
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
@@ -64,7 +65,7 @@ func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntr
|
||||
}
|
||||
err = json.Unmarshal(bytes, &entry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New(string(bytes))
|
||||
}
|
||||
|
||||
extension := app.ExtensionsMap[entry.Protocol.Name]
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
)
|
||||
|
||||
type EntriesRequest struct {
|
||||
@@ -44,11 +43,6 @@ type WebSocketTappedEntryMessage struct {
|
||||
Data *tapApi.OutputChannelItem
|
||||
}
|
||||
|
||||
type WebsocketOutboundLinkMessage struct {
|
||||
*shared.WebSocketMessageMetadata
|
||||
Data *tap.OutboundLink
|
||||
}
|
||||
|
||||
type AuthStatus struct {
|
||||
Email string `json:"email"`
|
||||
Model string `json:"model"`
|
||||
|
||||
@@ -4,19 +4,13 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
)
|
||||
|
||||
const tlsLinkRetainmentTime = time.Minute * 15
|
||||
|
||||
var (
|
||||
authStatus *models.AuthStatus
|
||||
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
|
||||
authStatus *models.AuthStatus
|
||||
)
|
||||
|
||||
func GetAuthStatus() (*models.AuthStatus, error) {
|
||||
@@ -51,16 +45,3 @@ func GetAuthStatus() (*models.AuthStatus, error) {
|
||||
|
||||
return authStatus, nil
|
||||
}
|
||||
|
||||
func GetAllRecentTLSAddresses() []string {
|
||||
recentTLSLinks := make([]string, 0)
|
||||
|
||||
for _, outboundLinkItem := range RecentTLSLinks.Items() {
|
||||
outboundLink, castOk := outboundLinkItem.Object.(*tap.OutboundLink)
|
||||
if castOk {
|
||||
recentTLSLinks = append(recentTLSLinks, outboundLink.DstIP)
|
||||
}
|
||||
}
|
||||
|
||||
return recentTLSLinks
|
||||
}
|
||||
|
||||
@@ -21,7 +21,5 @@ func StatusRoutes(ginApp *gin.Engine) {
|
||||
|
||||
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
|
||||
|
||||
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
||||
}
|
||||
|
||||
@@ -211,11 +211,15 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
|
||||
logger.Log.Infof("Getting entries from the database")
|
||||
|
||||
BasenineReconnect:
|
||||
var connection *basenine.Connection
|
||||
var err error
|
||||
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||
connection.Close()
|
||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||
goto BasenineReconnect
|
||||
}
|
||||
|
||||
data := make(chan []byte)
|
||||
@@ -324,7 +328,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
wg.Add(2)
|
||||
|
||||
if err = connection.Query(query, data, meta); err != nil {
|
||||
logger.Log.Panicf("Query mode call failed: %v", err)
|
||||
logger.Log.Errorf("Query mode call failed: %v", err)
|
||||
connection.Close()
|
||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||
goto BasenineReconnect
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
@@ -11,7 +11,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220413173135-69508ca741d7
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220419100955-e2ca51087607
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
|
||||
|
||||
@@ -600,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220413173135-69508ca741d7 h1:3Mi+0tQFVHXYcrFhwH/h6/2b0tayLcYeFPXyzDV3rvc=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220413173135-69508ca741d7/go.mod h1:v0hIh31iwDGbkkdeSSppdMNm1oIigfCA2mG2XajKnf8=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220419100955-e2ca51087607 h1:gCOwbfjsLslDw63yj/3l9d5TH7ikhIWHd7j0lE9U26U=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220419100955-e2ca51087607/go.mod h1:v0hIh31iwDGbkkdeSSppdMNm1oIigfCA2mG2XajKnf8=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
|
||||
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
|
||||
|
||||
@@ -1,19 +1,20 @@
|
||||
package shared
|
||||
|
||||
const (
|
||||
MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS"
|
||||
SyncEntriesConfigEnvVar = "SYNC_ENTRIES_CONFIG"
|
||||
HostModeEnvVar = "HOST_MODE"
|
||||
NodeNameEnvVar = "NODE_NAME"
|
||||
ConfigDirPath = "/app/config/"
|
||||
DataDirPath = "/app/data/"
|
||||
ValidationRulesFileName = "validation-rules.yaml"
|
||||
ContractFileName = "contract-oas.yaml"
|
||||
ConfigFileName = "mizu-config.json"
|
||||
GoGCEnvVar = "GOGC"
|
||||
DefaultApiServerPort = 8899
|
||||
LogLevelEnvVar = "LOG_LEVEL"
|
||||
MizuAgentImageRepo = "docker.io/up9inc/mizu"
|
||||
BasenineHost = "127.0.0.1"
|
||||
BaseninePort = "9099"
|
||||
MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS"
|
||||
SyncEntriesConfigEnvVar = "SYNC_ENTRIES_CONFIG"
|
||||
HostModeEnvVar = "HOST_MODE"
|
||||
NodeNameEnvVar = "NODE_NAME"
|
||||
ConfigDirPath = "/app/config/"
|
||||
DataDirPath = "/app/data/"
|
||||
ValidationRulesFileName = "validation-rules.yaml"
|
||||
ContractFileName = "contract-oas.yaml"
|
||||
ConfigFileName = "mizu-config.json"
|
||||
GoGCEnvVar = "GOGC"
|
||||
DefaultApiServerPort = 8899
|
||||
LogLevelEnvVar = "LOG_LEVEL"
|
||||
MizuAgentImageRepo = "docker.io/up9inc/mizu"
|
||||
BasenineHost = "127.0.0.1"
|
||||
BaseninePort = "9099"
|
||||
BasenineReconnectInterval = 3
|
||||
)
|
||||
|
||||
@@ -806,11 +806,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
agentContainer.WithResources(agentResources)
|
||||
|
||||
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
|
||||
nodeSelectorRequirement.WithKey("kubernetes.io/hostname")
|
||||
nodeSelectorRequirement.WithKey("metadata.name")
|
||||
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
|
||||
nodeSelectorRequirement.WithValues(nodeNames...)
|
||||
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
|
||||
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
|
||||
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
|
||||
nodeSelector := applyconfcore.NodeSelector()
|
||||
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
|
||||
nodeAffinity := applyconfcore.NodeAffinity()
|
||||
|
||||
@@ -26,7 +26,8 @@ func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) shared.NodeToPodsMap {
|
||||
func getMinimizedPod(fullPod core.Pod) core.Pod {
|
||||
return core.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fullPod.Name,
|
||||
Name: fullPod.Name,
|
||||
Namespace: fullPod.Namespace,
|
||||
},
|
||||
Status: core.PodStatus{
|
||||
PodIP: fullPod.Status.PodIP,
|
||||
|
||||
@@ -20,7 +20,6 @@ const (
|
||||
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
|
||||
WebSocketMessageTypeUpdateTappedPods WebSocketMessageType = "tappedPods"
|
||||
WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus"
|
||||
WebsocketMessageTypeOutboundLink WebSocketMessageType = "outboundLink"
|
||||
WebSocketMessageTypeToast WebSocketMessageType = "toast"
|
||||
WebSocketMessageTypeQueryMetadata WebSocketMessageType = "queryMetadata"
|
||||
WebSocketMessageTypeStartTime WebSocketMessageType = "startTime"
|
||||
@@ -92,7 +91,7 @@ func (np NodeToPodsMap) Summary() map[string][]string {
|
||||
summary := make(map[string][]string)
|
||||
for node, pods := range np {
|
||||
for _, pod := range pods {
|
||||
summary[node] = append(summary[node], pod.Namespace + "/" + pod.Name)
|
||||
summary[node] = append(summary[node], pod.Namespace+"/"+pod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
const mizuTestEnvVar = "MIZU_TEST"
|
||||
const UNKNOWN_NAMESPACE = ""
|
||||
|
||||
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
|
||||
var UnknownPort uint16 = 0
|
||||
@@ -92,14 +93,15 @@ type RequestResponsePair struct {
|
||||
Response GenericMessage `json:"response"`
|
||||
}
|
||||
|
||||
// `Protocol` is modified in the later stages of data propagation. Therefore it's not a pointer.
|
||||
type OutputChannelItem struct {
|
||||
// `Protocol` is modified in later stages of data propagation. Therefore, it's not a pointer.
|
||||
Protocol Protocol
|
||||
Capture Capture
|
||||
Timestamp int64
|
||||
ConnectionInfo *ConnectionInfo
|
||||
Pair *RequestResponsePair
|
||||
Summary *BaseEntry
|
||||
Namespace string
|
||||
}
|
||||
|
||||
type SuperTimer struct {
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect7/amqp/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/amqp/\* expect
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect7/http/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/http/\* expect
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect7/kafka/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect7/redis/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/redis/\* expect
|
||||
|
||||
@@ -3,7 +3,6 @@ module github.com/up9inc/mizu/tap
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
|
||||
github.com/cilium/ebpf v0.8.0
|
||||
github.com/go-errors/errors v1.4.2
|
||||
github.com/google/gopacket v1.1.19
|
||||
|
||||
@@ -91,8 +91,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
|
||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
|
||||
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
|
||||
@@ -29,21 +29,6 @@ func getLocalhostIPs() ([]string, error) {
|
||||
return myIPs, nil
|
||||
}
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
func isPrivateIP(ipStr string) bool {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, block := range privateIPBlocks {
|
||||
if block.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func initPrivateIPBlocks() {
|
||||
for _, cidr := range []string{
|
||||
"127.0.0.0/8", // IPv4 loopback
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
package tap
|
||||
|
||||
type OutboundLinkProtocol string
|
||||
|
||||
const (
|
||||
TLSProtocol OutboundLinkProtocol = "tls"
|
||||
)
|
||||
|
||||
type OutboundLink struct {
|
||||
Src string
|
||||
DstIP string
|
||||
DstPort int
|
||||
SuggestedResolvedName string
|
||||
SuggestedProtocol OutboundLinkProtocol
|
||||
}
|
||||
|
||||
func NewOutboundLinkWriter() *OutboundLinkWriter {
|
||||
return &OutboundLinkWriter{
|
||||
OutChan: make(chan *OutboundLink),
|
||||
}
|
||||
}
|
||||
|
||||
type OutboundLinkWriter struct {
|
||||
OutChan chan *OutboundLink
|
||||
}
|
||||
|
||||
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
|
||||
olw.OutChan <- &OutboundLink{
|
||||
Src: src,
|
||||
DstIP: DstIP,
|
||||
DstPort: DstPort,
|
||||
SuggestedResolvedName: SuggestedResolvedName,
|
||||
SuggestedProtocol: SuggestedProtocol,
|
||||
}
|
||||
}
|
||||
|
||||
func (olw *OutboundLinkWriter) Stop() {
|
||||
close(olw.OutChan)
|
||||
}
|
||||
@@ -27,9 +27,6 @@ import (
|
||||
|
||||
const cleanPeriod = time.Second * 10
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
var remoteOnlyOutboundPorts = []int{80, 443}
|
||||
|
||||
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
||||
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
|
||||
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
|
||||
@@ -58,7 +55,7 @@ var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
||||
var memprofile = flag.String("memprofile", "", "Write memory profile")
|
||||
|
||||
type TapOpts struct {
|
||||
HostMode bool
|
||||
HostMode bool
|
||||
}
|
||||
|
||||
var extensions []*api.Extension // global
|
||||
@@ -68,24 +65,6 @@ var packetSourceManager *source.PacketSourceManager // global
|
||||
var mainPacketInputChan chan source.TcpPacketInfo // global
|
||||
var tlsTapperInstance *tlstapper.TlsTapper // global
|
||||
|
||||
func inArrayInt(arr []int, valueToCheck int) bool {
|
||||
for _, value := range arr {
|
||||
if value == valueToCheck {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func inArrayString(arr []string, valueToCheck string) bool {
|
||||
for _, value := range arr {
|
||||
if value == valueToCheck {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) {
|
||||
extensions = extensionsRef
|
||||
filteringOptions = options
|
||||
|
||||
@@ -7,13 +7,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/bradleyfalzon/tlsx"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
const checkTLSPacketAmount = 100
|
||||
|
||||
type tcpReaderDataMsg struct {
|
||||
bytes []byte
|
||||
timestamp time.Time
|
||||
@@ -33,22 +30,21 @@ type ConnectionInfo struct {
|
||||
* Implements io.Reader interface (Read)
|
||||
*/
|
||||
type tcpReader struct {
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
superTimer *api.SuperTimer
|
||||
parent *tcpStream
|
||||
packetsSeen uint
|
||||
outboundLinkWriter *OutboundLinkWriter
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
ident string
|
||||
tcpID *api.TcpID
|
||||
isClosed bool
|
||||
isClient bool
|
||||
isOutgoing bool
|
||||
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
data []byte
|
||||
progress *api.ReadProgress
|
||||
superTimer *api.SuperTimer
|
||||
parent *tcpStream
|
||||
packetsSeen uint
|
||||
extension *api.Extension
|
||||
emitter api.Emitter
|
||||
counterPair *api.CounterPair
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
@@ -64,16 +60,6 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
if len(h.data) > 0 {
|
||||
h.packetsSeen += 1
|
||||
}
|
||||
if h.packetsSeen < checkTLSPacketAmount && len(msg.bytes) > 5 { // packets with less than 5 bytes cause tlsx to panic
|
||||
clientHello := tlsx.ClientHello{}
|
||||
err := clientHello.Unmarshall(msg.bytes)
|
||||
if err == nil {
|
||||
logger.Log.Debugf("Detected TLS client hello with SNI %s", clientHello.SNI)
|
||||
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
|
||||
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
|
||||
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok || len(h.data) == 0 {
|
||||
return 0, io.EOF
|
||||
|
||||
@@ -20,12 +20,11 @@ import (
|
||||
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
|
||||
*/
|
||||
type tcpStreamFactory struct {
|
||||
wg sync.WaitGroup
|
||||
outboundLinkWriter *OutboundLinkWriter
|
||||
Emitter api.Emitter
|
||||
streamsMap *tcpStreamMap
|
||||
ownIps []string
|
||||
opts *TapOpts
|
||||
wg sync.WaitGroup
|
||||
Emitter api.Emitter
|
||||
streamsMap *tcpStreamMap
|
||||
ownIps []string
|
||||
opts *TapOpts
|
||||
}
|
||||
|
||||
type tcpStreamWrapper struct {
|
||||
@@ -63,9 +62,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
srcPort := transport.Src().String()
|
||||
dstPort := transport.Dst().String()
|
||||
|
||||
// if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
|
||||
// factory.outboundLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "")
|
||||
// }
|
||||
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
|
||||
isTapTarget := props.isTapTarget
|
||||
stream := &tcpStream{
|
||||
@@ -99,14 +95,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
SrcPort: srcPort,
|
||||
DstPort: dstPort,
|
||||
},
|
||||
parent: stream,
|
||||
isClient: true,
|
||||
isOutgoing: props.isOutgoing,
|
||||
outboundLinkWriter: factory.outboundLinkWriter,
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
parent: stream,
|
||||
isClient: true,
|
||||
isOutgoing: props.isOutgoing,
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
})
|
||||
stream.servers = append(stream.servers, tcpReader{
|
||||
msgQueue: make(chan tcpReaderDataMsg),
|
||||
@@ -119,14 +114,13 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
SrcPort: transport.Dst().String(),
|
||||
DstPort: transport.Src().String(),
|
||||
},
|
||||
parent: stream,
|
||||
isClient: false,
|
||||
isOutgoing: props.isOutgoing,
|
||||
outboundLinkWriter: factory.outboundLinkWriter,
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
parent: stream,
|
||||
isClient: false,
|
||||
isOutgoing: props.isOutgoing,
|
||||
extension: extension,
|
||||
emitter: factory.Emitter,
|
||||
counterPair: counterPair,
|
||||
reqResMatcher: reqResMatcher,
|
||||
})
|
||||
|
||||
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
|
||||
@@ -174,15 +168,6 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
|
||||
}
|
||||
}
|
||||
|
||||
//lint:ignore U1000 will be used in the future
|
||||
func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool {
|
||||
if inArrayInt(remoteOnlyOutboundPorts, dstPort) {
|
||||
isDirectedHere := inArrayString(factory.ownIps, dstIP)
|
||||
return !isDirectedHere && !isPrivateIP(dstIP)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getPacketOrigin(ac reassembly.AssemblerContext) api.Capture {
|
||||
c, ok := ac.(*context)
|
||||
|
||||
|
||||
13
tap/tlstapper/tls_emitter.go
Normal file
13
tap/tlstapper/tls_emitter.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package tlstapper
|
||||
|
||||
import "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
type tlsEmitter struct {
|
||||
delegate api.Emitter
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (e *tlsEmitter) Emit(item *api.OutputChannelItem) {
|
||||
item.Namespace = e.namespace
|
||||
e.delegate.Emit(item)
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
@@ -19,13 +20,14 @@ import (
|
||||
)
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
pidToNamespace sync.Map
|
||||
}
|
||||
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
@@ -151,16 +153,21 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
|
||||
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
|
||||
tlsEmitter := &tlsEmitter{
|
||||
delegate: emitter,
|
||||
namespace: p.getNamespace(chunk.Pid),
|
||||
}
|
||||
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, tlsEmitter, options, p.reqResMatcher)
|
||||
return reader
|
||||
}
|
||||
|
||||
func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid *api.TcpID,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
|
||||
tlsEmitter *tlsEmitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
|
||||
b := bufio.NewReader(reader)
|
||||
|
||||
err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
|
||||
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
|
||||
&api.SuperTimer{}, &api.SuperIdentifier{}, tlsEmitter, options, reqResMatcher)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err)
|
||||
@@ -205,6 +212,33 @@ func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpI
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) addPid(pid uint32, namespace string) {
|
||||
p.pidToNamespace.Store(pid, namespace)
|
||||
}
|
||||
|
||||
func (p *tlsPoller) getNamespace(pid uint32) string {
|
||||
namespaceIfc, ok := p.pidToNamespace.Load(pid)
|
||||
|
||||
if !ok {
|
||||
return api.UNKNOWN_NAMESPACE
|
||||
}
|
||||
|
||||
namespace, ok := namespaceIfc.(string)
|
||||
|
||||
if !ok {
|
||||
return api.UNKNOWN_NAMESPACE
|
||||
}
|
||||
|
||||
return namespace
|
||||
}
|
||||
|
||||
func (p *tlsPoller) clearPids() {
|
||||
p.pidToNamespace.Range(func(key, v interface{}) bool {
|
||||
p.pidToNamespace.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
var flagsStr string
|
||||
|
||||
|
||||
@@ -24,11 +24,11 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
tls.ClearPids()
|
||||
|
||||
for _, pid := range containerPids {
|
||||
if err := tls.AddPid(procfs, pid); err != nil {
|
||||
for pid, pod := range containerPids {
|
||||
if err := tls.AddPid(procfs, pid, pod.Namespace); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
}
|
||||
@@ -36,8 +36,8 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, error) {
|
||||
result := make([]uint32, 0)
|
||||
func findContainerPids(procfs string, containerIds map[string]v1.Pod) (map[uint32]v1.Pod, error) {
|
||||
result := make(map[uint32]v1.Pod)
|
||||
|
||||
pids, err := ioutil.ReadDir(procfs)
|
||||
|
||||
@@ -63,7 +63,9 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := containerIds[cgroup]; !ok {
|
||||
pod, ok := containerIds[cgroup]
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -73,14 +75,14 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, uint32(pidNumber))
|
||||
result[uint32(pidNumber)] = pod
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool {
|
||||
result := make(map[string]bool)
|
||||
func buildContainerIdsMap(pods *[]v1.Pod) map[string]v1.Pod {
|
||||
result := make(map[string]v1.Pod)
|
||||
|
||||
for _, pod := range *pods {
|
||||
for _, container := range pod.Status.ContainerStatuses {
|
||||
@@ -91,7 +93,7 @@ func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool {
|
||||
continue
|
||||
}
|
||||
|
||||
result[url.Host] = true
|
||||
result[url.Host] = pod
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,14 +143,14 @@ func extractCgroup(lines []string) string {
|
||||
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
|
||||
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
|
||||
//
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// the "Container ID:" field when running kubectl describe pod <POD>
|
||||
//
|
||||
func normalizeCgroup(cgrouppath string) string {
|
||||
basename := strings.TrimSpace(path.Base(cgrouppath))
|
||||
|
||||
|
||||
if strings.Contains(basename, "-") {
|
||||
basename = basename[strings.Index(basename, "-") + 1:]
|
||||
basename = basename[strings.Index(basename, "-")+1:]
|
||||
}
|
||||
|
||||
if strings.Contains(basename, ".") {
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/cilium/ebpf/rlimit"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const GLOABL_TAP_PID = 0
|
||||
@@ -58,10 +59,10 @@ func (t *TlsTapper) PollForLogging() {
|
||||
}
|
||||
|
||||
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
|
||||
return t.tapPid(GLOABL_TAP_PID, sslLibrary)
|
||||
return t.tapPid(GLOABL_TAP_PID, sslLibrary, api.UNKNOWN_NAMESPACE)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) AddPid(procfs string, pid uint32) error {
|
||||
func (t *TlsTapper) AddPid(procfs string, pid uint32, namespace string) error {
|
||||
sslLibrary, err := findSsllib(procfs, pid)
|
||||
|
||||
if err != nil {
|
||||
@@ -69,7 +70,7 @@ func (t *TlsTapper) AddPid(procfs string, pid uint32) error {
|
||||
return nil // hide the error on purpose, its OK for a process to not use libssl.so
|
||||
}
|
||||
|
||||
return t.tapPid(pid, sslLibrary)
|
||||
return t.tapPid(pid, sslLibrary, namespace)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) RemovePid(pid uint32) error {
|
||||
@@ -85,12 +86,13 @@ func (t *TlsTapper) RemovePid(pid uint32) error {
|
||||
}
|
||||
|
||||
func (t *TlsTapper) ClearPids() {
|
||||
t.poller.clearPids()
|
||||
t.registeredPids.Range(func(key, v interface{}) bool {
|
||||
pid := key.(uint32)
|
||||
if pid == GLOABL_TAP_PID {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
if err := t.RemovePid(pid); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
@@ -133,7 +135,7 @@ func setupRLimit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string, namespace string) error {
|
||||
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)
|
||||
|
||||
newSsl := sslHooks{}
|
||||
@@ -144,12 +146,14 @@ func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
|
||||
t.sslHooksStructs = append(t.sslHooksStructs, newSsl)
|
||||
|
||||
t.poller.addPid(pid, namespace)
|
||||
|
||||
pids := t.bpfObjects.tlsTapperMaps.PidsMap
|
||||
|
||||
if err := pids.Put(pid, uint32(1)); err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
|
||||
t.registeredPids.Store(pid, true)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -54,11 +54,6 @@ export default class Api {
|
||||
return response.data;
|
||||
}
|
||||
|
||||
getRecentTLSLinks = async () => {
|
||||
const response = await client.get("/status/recentTLSLinks");
|
||||
return response.data;
|
||||
}
|
||||
|
||||
getOasServices = async () => {
|
||||
const response = await client.get("/oas");
|
||||
return response.data;
|
||||
@@ -121,4 +116,4 @@ export function getWebsocketUrl(){
|
||||
}
|
||||
|
||||
return websocketUrl;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
.httpsDomains
|
||||
display: none
|
||||
margin: 0
|
||||
padding: 0
|
||||
list-style: none
|
||||
|
||||
.customWarningStyle
|
||||
&:hover
|
||||
overflow-y: scroll
|
||||
height: 85px
|
||||
.httpsDomains
|
||||
display: block
|
||||
@@ -1,44 +0,0 @@
|
||||
import {Snackbar} from "@material-ui/core";
|
||||
import MuiAlert from "@material-ui/lab/Alert";
|
||||
import React, {useEffect} from "react";
|
||||
import { RecoilState, useRecoilValue } from "recoil";
|
||||
import TrafficViewerApiAtom from "../../recoil/TrafficViewerApi/atom";
|
||||
import TrafficViewerApi from "../TrafficViewer/TrafficViewerApi";
|
||||
import './TLSWarning.sass';
|
||||
|
||||
interface TLSWarningProps {
|
||||
showTLSWarning: boolean
|
||||
setShowTLSWarning: (show: boolean) => void
|
||||
addressesWithTLS: Set<string>
|
||||
setAddressesWithTLS: (addresses: Set<string>) => void
|
||||
userDismissedTLSWarning: boolean
|
||||
setUserDismissedTLSWarning: (flag: boolean) => void
|
||||
}
|
||||
|
||||
export const TLSWarning: React.FC<TLSWarningProps> = ({showTLSWarning, setShowTLSWarning, addressesWithTLS, setAddressesWithTLS, userDismissedTLSWarning, setUserDismissedTLSWarning}) => {
|
||||
|
||||
const trafficViewerApi = useRecoilValue(TrafficViewerApiAtom as RecoilState<TrafficViewerApi>)
|
||||
useEffect(() => {
|
||||
(async () => {
|
||||
try {
|
||||
const getRecentTLSLinksFunc = trafficViewerApi?.getRecentTLSLinks ? trafficViewerApi?.getRecentTLSLinks : function(){}
|
||||
const recentTLSLinks = await getRecentTLSLinksFunc();
|
||||
if (recentTLSLinks?.length > 0) {
|
||||
setAddressesWithTLS(new Set(recentTLSLinks));
|
||||
setShowTLSWarning(true);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
})();
|
||||
}, [setShowTLSWarning, setAddressesWithTLS,trafficViewerApi]);
|
||||
|
||||
return (<Snackbar open={showTLSWarning && !userDismissedTLSWarning}>
|
||||
<MuiAlert classes={{filledWarning: 'customWarningStyle'}} elevation={6} variant="filled"
|
||||
onClose={() => setUserDismissedTLSWarning(true)} severity="warning">
|
||||
Mizu is detecting TLS traffic, this type of traffic will not be displayed.
|
||||
{addressesWithTLS.size > 0 &&
|
||||
<ul className="httpsDomains"> {Array.from(addressesWithTLS, address => <li>{address}</li>)} </ul>}
|
||||
</MuiAlert>
|
||||
</Snackbar>);
|
||||
}
|
||||
@@ -12,7 +12,7 @@ import TrafficViewerApiAtom from "../../recoil/TrafficViewerApi";
|
||||
import TrafficViewerApi from "./TrafficViewerApi";
|
||||
import focusedEntryIdAtom from "../../recoil/focusedEntryId";
|
||||
import {toast} from "react-toastify";
|
||||
import {TOAST_CONTAINER_ID} from "../../configs/Consts";
|
||||
import {MAX_ENTRIES, TOAST_CONTAINER_ID} from "../../configs/Consts";
|
||||
import tappingStatusAtom from "../../recoil/tappingStatus";
|
||||
import leftOffTopAtom from "../../recoil/leftOffTop";
|
||||
|
||||
@@ -98,8 +98,8 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
setIsLoadingTop(false);
|
||||
|
||||
const newEntries = [...data.data.reverse(), ...entries];
|
||||
if(newEntries.length > 10000) {
|
||||
newEntries.splice(10000, newEntries.length - 10000)
|
||||
if(newEntries.length > MAX_ENTRIES) {
|
||||
newEntries.splice(MAX_ENTRIES, newEntries.length - MAX_ENTRIES)
|
||||
}
|
||||
setEntries(newEntries);
|
||||
|
||||
@@ -118,23 +118,28 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
|
||||
const scrollbarVisible = scrollableRef.current?.childWrapperRef.current.clientHeight > scrollableRef.current?.wrapperRef.current.clientHeight;
|
||||
|
||||
useEffect(() => {
|
||||
if (!focusedEntryId && entries.length > 0)
|
||||
setFocusedEntryId(entries[0].id);
|
||||
}, [focusedEntryId, entries])
|
||||
|
||||
useEffect(() => {
|
||||
const newEntries = [...entries];
|
||||
if (newEntries.length > MAX_ENTRIES) {
|
||||
setLeftOffTop(newEntries[0].id);
|
||||
newEntries.splice(0, newEntries.length - MAX_ENTRIES)
|
||||
setNoMoreDataTop(false);
|
||||
setEntries(newEntries);
|
||||
}
|
||||
}, [entries])
|
||||
|
||||
if (ws.current) {
|
||||
if(ws.current && !ws.current.onmessage) {
|
||||
ws.current.onmessage = (e) => {
|
||||
if (!e?.data) return;
|
||||
const message = JSON.parse(e.data);
|
||||
switch (message.messageType) {
|
||||
case "entry":
|
||||
const entry = message.data;
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id);
|
||||
const newEntries = [...entries, entry];
|
||||
if (newEntries.length > 10000) {
|
||||
setLeftOffTop(newEntries[0].id);
|
||||
newEntries.splice(0, newEntries.length - 10000)
|
||||
setNoMoreDataTop(false);
|
||||
}
|
||||
setEntries(newEntries);
|
||||
setEntries(entriesState => [...entriesState, message.data]);
|
||||
break;
|
||||
case "status":
|
||||
setTappingStatus(message.tappingStatus);
|
||||
@@ -151,9 +156,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
case "queryMetadata":
|
||||
setTruncatedTimestamp(message.data.truncatedTimestamp);
|
||||
setQueriedTotal(message.data.total);
|
||||
if (leftOffTop === "") {
|
||||
setLeftOffTop(message.data.leftOff);
|
||||
}
|
||||
setLeftOffTop(leftOffState => leftOffState === "" ? message.data.leftOff : leftOffState);
|
||||
break;
|
||||
case "startTime":
|
||||
setStartTime(message.data);
|
||||
@@ -206,7 +209,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
</div>
|
||||
|
||||
<div className={styles.footer}>
|
||||
<div>Displaying <b id="entries-length">{entries?.length}</b> results out of <b
|
||||
<div>Displaying <b id="entries-length">{entries?.length > MAX_ENTRIES ? MAX_ENTRIES : entries?.length}</b> results out of <b
|
||||
id="total-entries">{queriedTotal}</b> total
|
||||
</div>
|
||||
{startTime !== 0 && <div>Started listening at <span style={{
|
||||
|
||||
@@ -14,7 +14,6 @@ import {RecoilRoot, RecoilState, useRecoilState, useRecoilValue, useSetRecoilSta
|
||||
import entriesAtom from "../../recoil/entries";
|
||||
import focusedEntryIdAtom from "../../recoil/focusedEntryId";
|
||||
import queryAtom from "../../recoil/query";
|
||||
import {TLSWarning} from "../TLSWarning/TLSWarning";
|
||||
import trafficViewerApiAtom from "../../recoil/TrafficViewerApi"
|
||||
import TrafficViewerApi from "./TrafficViewerApi";
|
||||
import {StatusBar} from "../UI/StatusBar";
|
||||
@@ -77,10 +76,6 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
const setLeftOffTop = useSetRecoilState(leftOffTopAtom);
|
||||
const scrollableRef = useRef(null);
|
||||
|
||||
const [showTLSWarning, setShowTLSWarning] = useState(false);
|
||||
const [userDismissedTLSWarning, setUserDismissedTLSWarning] = useState(false);
|
||||
const [addressesWithTLS, setAddressesWithTLS] = useState(new Set<string>());
|
||||
|
||||
const handleQueryChange = useMemo(
|
||||
() =>
|
||||
debounce(async (query: string) => {
|
||||
@@ -286,12 +281,6 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
<EntryDetailed/>
|
||||
</div>
|
||||
</div>}
|
||||
<TLSWarning showTLSWarning={showTLSWarning}
|
||||
setShowTLSWarning={setShowTLSWarning}
|
||||
addressesWithTLS={addressesWithTLS}
|
||||
setAddressesWithTLS={setAddressesWithTLS}
|
||||
userDismissedTLSWarning={userDismissedTLSWarning}
|
||||
setUserDismissedTLSWarning={setUserDismissedTLSWarning}/>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
@@ -4,7 +4,6 @@ type TrafficViewerApi = {
|
||||
analyzeStatus: () => any
|
||||
fetchEntries: (leftOff: any, direction: number, query: any, limit: number, timeoutMs: number) => any
|
||||
getEntry: (entryId: any, query: string) => any
|
||||
getRecentTLSLinks: () => any,
|
||||
webSocket: {
|
||||
close: () => void
|
||||
}
|
||||
|
||||
@@ -1,19 +1,22 @@
|
||||
import React, { CSSProperties } from "react";
|
||||
import React from "react";
|
||||
import styles from "./style/InformationIcon.module.sass"
|
||||
|
||||
const DEFUALT_LINK = "https://getmizu.io/docs"
|
||||
|
||||
export interface InformationIconProps {
|
||||
interface LinkProps {
|
||||
link?: string,
|
||||
style?: CSSProperties
|
||||
className?: string
|
||||
title?: string
|
||||
}
|
||||
|
||||
export const InformationIcon: React.FC<InformationIconProps> = ({ link, style }) => {
|
||||
return <React.Fragment>
|
||||
<a href={DEFUALT_LINK ? DEFUALT_LINK : link} style={style} className={styles.linkStyle} title="documentation" target="_blank">
|
||||
<span>Docs</span>
|
||||
</a>
|
||||
</React.Fragment>
|
||||
export const Link: React.FC<LinkProps> = ({ link, className, title, children }) => {
|
||||
return <a href={link} className={className} title={title} target="_blank">
|
||||
{children}
|
||||
</a>
|
||||
}
|
||||
|
||||
|
||||
export const InformationIcon: React.FC<LinkProps> = ({ className }) => {
|
||||
return <Link title="documentation" className={`${styles.linkStyle} ${className}`} link={DEFUALT_LINK}>
|
||||
<span>Docs</span>
|
||||
</Link>
|
||||
}
|
||||
@@ -5,10 +5,10 @@ import Tooltip from "./Tooltip";
|
||||
import Checkbox from "./Checkbox"
|
||||
import { StatusBar } from "./StatusBar";
|
||||
import CustomModal from "./CustomModal";
|
||||
import { InformationIcon } from "./InformationIcon";
|
||||
import { InformationIcon, Link } from "./InformationIcon";
|
||||
import SelectList from "./SelectList";
|
||||
import NoDataMessage from "./NoDataMessage";
|
||||
|
||||
|
||||
export { LoadingOverlay, Select, Tabs, Tooltip, Checkbox, CustomModal, InformationIcon, SelectList, NoDataMessage }
|
||||
export { LoadingOverlay, Select, Tabs, Tooltip, Checkbox, CustomModal, InformationIcon, SelectList, NoDataMessage, Link }
|
||||
export { StatusBar }
|
||||
@@ -1 +1,2 @@
|
||||
export const TOAST_CONTAINER_ID = "Common";
|
||||
export const TOAST_CONTAINER_ID = "Common";
|
||||
export const MAX_ENTRIES = 10000;
|
||||
|
||||
@@ -62,11 +62,6 @@ export default class Api {
|
||||
return response.data;
|
||||
}
|
||||
|
||||
getRecentTLSLinks = async () => {
|
||||
const response = await client.get("/status/recentTLSLinks");
|
||||
return response.data;
|
||||
}
|
||||
|
||||
getAuthStatus = async () => {
|
||||
const response = await client.get("/status/auth");
|
||||
return response.data;
|
||||
|
||||
Reference in New Issue
Block a user