mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-18 20:09:53 +00:00
Compare commits
59 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dca636b0fd | ||
|
|
12d873d344 | ||
|
|
672accba0c | ||
|
|
566eab3527 | ||
|
|
0f52533cd8 | ||
|
|
eef58496b5 | ||
|
|
1137f9386b | ||
|
|
93714ab902 | ||
|
|
fc03ba2eda | ||
|
|
3662fbcdf6 | ||
|
|
b762e3c194 | ||
|
|
35ef211477 | ||
|
|
feb386ba1f | ||
|
|
ed4a818a53 | ||
|
|
fa733025dc | ||
|
|
5f603e3291 | ||
|
|
b84c698c1a | ||
|
|
9b72cc7aa6 | ||
|
|
c59aadb221 | ||
|
|
6aaee4b519 | ||
|
|
6f47ad862e | ||
|
|
f18f3da99c | ||
|
|
3e32c889d9 | ||
|
|
f604a3a35d | ||
|
|
5d205b5082 | ||
|
|
756f5f5720 | ||
|
|
9a1c17cc61 | ||
|
|
64253cd919 | ||
|
|
accad7c058 | ||
|
|
485bc7fd2b | ||
|
|
bc3efc6d4c | ||
|
|
135b1a5e1e | ||
|
|
31dcfc4b2e | ||
|
|
fcf27e7c4d | ||
|
|
8b4d813bd8 | ||
|
|
b7d3ff6eb8 | ||
|
|
931b6f4260 | ||
|
|
ba7b97cf7b | ||
|
|
8316f8456f | ||
|
|
f98185f0f0 | ||
|
|
88a5befd4b | ||
|
|
1bf5bf0b31 | ||
|
|
2c8d1f854d | ||
|
|
7dad5be676 | ||
|
|
b3cfd20a78 | ||
|
|
1c4588a83c | ||
|
|
76bb3db553 | ||
|
|
ff2131ea1e | ||
|
|
107c2d5b59 | ||
|
|
4bc16fa0b4 | ||
|
|
47237f05a5 | ||
|
|
ea8359cbdf | ||
|
|
27c7d66478 | ||
|
|
5473f11215 | ||
|
|
3497dc057b | ||
|
|
f958de6619 | ||
|
|
19fba89ca5 | ||
|
|
07c19b5d6d | ||
|
|
fc5d6b2d0a |
12
README.md
12
README.md
@@ -1,5 +1,5 @@
|
||||
# 水 mizu
|
||||
standalone web app traffic viewer for Kubernetes
|
||||
A simple-yet-powerful API traffic viewer for Kubernetes to help you troubleshoot and debug your microservices. Think TCPDump and Chrome Dev Tools combined.
|
||||
|
||||
## Download
|
||||
|
||||
@@ -13,13 +13,6 @@ curl -Lo mizu \
|
||||
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \
|
||||
&& chmod 755 mizu
|
||||
```
|
||||
|
||||
* for MacOS - Apple Silicon
|
||||
```
|
||||
curl -Lo mizu \
|
||||
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_arm64 \
|
||||
&& chmod 755 mizu
|
||||
```
|
||||
|
||||
* for Linux - Intel 64bit
|
||||
```
|
||||
@@ -28,7 +21,6 @@ https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \
|
||||
&& chmod 755 mizu
|
||||
```
|
||||
|
||||
|
||||
SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page.
|
||||
|
||||
### Development (unstable) build
|
||||
@@ -37,7 +29,7 @@ Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page.
|
||||
## How to run
|
||||
|
||||
1. Find pod you'd like to tap to in your Kubernetes cluster
|
||||
2. Run `mizu PODNAME` or `mizu REGEX`
|
||||
2. Run `mizu tap PODNAME` or `mizu tap REGEX`
|
||||
3. Open browser on `http://localhost:8899` as instructed ..
|
||||
4. Watch the WebAPI traffic flowing ..
|
||||
5. Type ^C to stop
|
||||
|
||||
@@ -11,16 +11,12 @@ require (
|
||||
github.com/go-playground/universal-translator v0.17.0
|
||||
github.com/go-playground/validator/v10 v10.5.0
|
||||
github.com/gofiber/fiber/v2 v2.8.0
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/google/martian v2.1.0+incompatible
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
go.mongodb.org/mongo-driver v1.5.1
|
||||
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
|
||||
gorm.io/driver/sqlite v1.1.4
|
||||
gorm.io/gorm v1.21.8
|
||||
k8s.io/api v0.21.0
|
||||
@@ -29,4 +25,5 @@ require (
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap
|
||||
|
||||
@@ -251,7 +251,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
|
||||
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
|
||||
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
|
||||
|
||||
11
api/main.go
11
api/main.go
@@ -37,7 +37,7 @@ func main() {
|
||||
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
|
||||
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||
|
||||
go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||
go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||
go api.StartReadingOutbound(outboundLinkOutputChannel)
|
||||
|
||||
@@ -66,8 +66,8 @@ func main() {
|
||||
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
|
||||
filteredHarChannel := make(chan *tap.OutputChannelItem)
|
||||
|
||||
go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||
go api.StartReadingEntries(filteredHarChannel, nil)
|
||||
go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
|
||||
|
||||
hostApi(socketHarOutChannel)
|
||||
}
|
||||
@@ -125,9 +125,14 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
|
||||
return &filteringOptions
|
||||
}
|
||||
|
||||
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
||||
func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
|
||||
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,10 +5,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/resolver"
|
||||
"mizuserver/pkg/utils"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
@@ -19,6 +15,11 @@ import (
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/resolver"
|
||||
"mizuserver/pkg/utils"
|
||||
)
|
||||
|
||||
var k8sResolver *resolver.Resolver
|
||||
@@ -84,7 +85,14 @@ func startReadingFiles(workingDir string) {
|
||||
|
||||
for _, entry := range inputHar.Log.Entries {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
saveHarToDb(entry, fileInfo.Name())
|
||||
connectionInfo := &tap.ConnectionInfo{
|
||||
ClientIP: fileInfo.Name(),
|
||||
ClientPort: "",
|
||||
ServerIP: "",
|
||||
ServerPort: "",
|
||||
IsOutgoing: false,
|
||||
}
|
||||
saveHarToDb(entry, connectionInfo)
|
||||
}
|
||||
rmErr := os.Remove(inputFilePath)
|
||||
utils.CheckErr(rmErr)
|
||||
@@ -97,7 +105,7 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
|
||||
}
|
||||
|
||||
for item := range outputItems {
|
||||
saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP)
|
||||
saveHarToDb(item.HarEntry, item.ConnectionInfo)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,17 +117,17 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
|
||||
}
|
||||
|
||||
|
||||
func saveHarToDb(entry *har.Entry, sender string) {
|
||||
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
|
||||
entryBytes, _ := json.Marshal(entry)
|
||||
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
|
||||
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
|
||||
entryId := primitive.NewObjectID().Hex()
|
||||
var (
|
||||
resolvedSource string
|
||||
resolvedDestination string
|
||||
)
|
||||
if k8sResolver != nil {
|
||||
resolvedSource = k8sResolver.Resolve(sender)
|
||||
resolvedDestination = k8sResolver.Resolve(serviceHostName)
|
||||
resolvedSource = k8sResolver.Resolve(connectionInfo.ClientIP)
|
||||
resolvedDestination = k8sResolver.Resolve(fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort))
|
||||
}
|
||||
mizuEntry := models.MizuEntry{
|
||||
EntryId: entryId,
|
||||
@@ -129,10 +137,11 @@ func saveHarToDb(entry *har.Entry, sender string) {
|
||||
Path: urlPath,
|
||||
Method: entry.Request.Method,
|
||||
Status: entry.Response.Status,
|
||||
RequestSenderIp: sender,
|
||||
RequestSenderIp: connectionInfo.ClientIP,
|
||||
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
|
||||
ResolvedSource: resolvedSource,
|
||||
ResolvedDestination: resolvedDestination,
|
||||
IsOutgoing: connectionInfo.IsOutgoing,
|
||||
}
|
||||
database.GetEntriesTable().Create(&mizuEntry)
|
||||
|
||||
@@ -141,8 +150,12 @@ func saveHarToDb(entry *har.Entry, sender string) {
|
||||
broadcastToBrowserClients(baseEntryBytes)
|
||||
}
|
||||
|
||||
func getServiceNameFromUrl(inputUrl string) (string, string, string) {
|
||||
func getServiceNameFromUrl(inputUrl string) (string, string) {
|
||||
parsed, err := url.Parse(inputUrl)
|
||||
utils.CheckErr(err)
|
||||
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host
|
||||
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path
|
||||
}
|
||||
|
||||
func CheckIsServiceIP(address string) bool {
|
||||
return k8sResolver.CheckIsServiceIP(address)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"mizuserver/pkg/controllers"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/routes"
|
||||
"mizuserver/pkg/up9"
|
||||
)
|
||||
|
||||
var browserClientSocketUUIDs = make([]string, 0)
|
||||
@@ -18,6 +19,9 @@ type RoutesEventHandlers struct {
|
||||
SocketHarOutChannel chan<- *tap.OutputChannelItem
|
||||
}
|
||||
|
||||
func init() {
|
||||
go up9.UpdateAnalyzeStatus(broadcastToBrowserClients)
|
||||
}
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(ep *ikisocket.EventPayload) {
|
||||
if ep.Kws.GetAttribute("is_tapper") == true {
|
||||
@@ -84,7 +88,6 @@ func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func removeSocketUUIDFromBrowserSlice(uuidToRemove string) {
|
||||
newUUIDSlice := make([]string, 0, len(browserClientSocketUUIDs))
|
||||
for _, uuid := range browserClientSocketUUIDs {
|
||||
|
||||
@@ -7,29 +7,13 @@ import (
|
||||
"github.com/google/martian/har"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
)
|
||||
|
||||
var (
|
||||
operatorToSymbolMapping = map[string]string{
|
||||
LT: "<",
|
||||
GT: ">",
|
||||
}
|
||||
operatorToOrderMapping = map[string]string{
|
||||
LT: OrderDesc,
|
||||
GT: OrderAsc,
|
||||
}
|
||||
)
|
||||
|
||||
func GetEntries(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.EntriesFilter{}
|
||||
|
||||
@@ -41,8 +25,8 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
|
||||
order := operatorToOrderMapping[entriesFilter.Operator]
|
||||
operatorSymbol := operatorToSymbolMapping[entriesFilter.Operator]
|
||||
order := database.OperatorToOrderMapping[entriesFilter.Operator]
|
||||
operatorSymbol := database.OperatorToSymbolMapping[entriesFilter.Operator]
|
||||
var entries []models.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
@@ -51,7 +35,7 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
Limit(entriesFilter.Limit).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 && order == OrderDesc {
|
||||
if len(entries) > 0 && order == database.OrderDesc {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
@@ -67,7 +51,7 @@ func GetEntries(c *fiber.Ctx) error {
|
||||
|
||||
func GetHARs(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.HarFetchRequestBody{}
|
||||
order := OrderDesc
|
||||
order := database.OrderDesc
|
||||
if err := c.QueryParser(entriesFilter); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
@@ -105,9 +89,20 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
|
||||
}
|
||||
|
||||
var fileName string
|
||||
sourceOfEntry := entryData.ResolvedSource
|
||||
fileName := fmt.Sprintf("%s.har", sourceOfEntry)
|
||||
if sourceOfEntry != "" {
|
||||
// naively assumes the proper service source is http
|
||||
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
|
||||
//replace / from the file name cause they end up creating a corrupted folder
|
||||
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
|
||||
} else {
|
||||
fileName = "unknown_source.har"
|
||||
}
|
||||
if harOfSource, ok := harsObject[fileName]; ok {
|
||||
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
|
||||
} else {
|
||||
@@ -121,11 +116,14 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
Name: "mizu",
|
||||
Version: "0.0.2",
|
||||
},
|
||||
Source: sourceOfEntry,
|
||||
},
|
||||
Entries: entriesHar,
|
||||
},
|
||||
}
|
||||
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
|
||||
if sourceOfEntry != "" {
|
||||
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,9 +136,25 @@ func GetHARs(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).SendStream(buffer)
|
||||
}
|
||||
|
||||
func UploadEntries(c *fiber.Ctx) error {
|
||||
uploadRequestBody := &models.UploadEntriesRequestBody{}
|
||||
if err := c.QueryParser(uploadRequestBody); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
if err := validation.Validate(uploadRequestBody); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
if up9.GetAnalyzeInfo().IsAnalyzing {
|
||||
return c.Status(fiber.StatusBadRequest).SendString("Cannot analyze, mizu is already analyzing")
|
||||
}
|
||||
|
||||
token, _ := up9.CreateAnonymousToken(uploadRequestBody.Dest)
|
||||
go up9.UploadEntriesImpl(token.Token, token.Model, uploadRequestBody.Dest)
|
||||
return c.Status(fiber.StatusOK).SendString("OK")
|
||||
}
|
||||
|
||||
func GetFullEntries(c *fiber.Ctx) error {
|
||||
entriesFilter := &models.HarFetchRequestBody{}
|
||||
order := OrderDesc
|
||||
if err := c.QueryParser(entriesFilter); err != nil {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(err)
|
||||
}
|
||||
@@ -162,23 +176,7 @@ func GetFullEntries(c *fiber.Ctx) error {
|
||||
timestampTo = entriesFilter.To
|
||||
}
|
||||
|
||||
var entries []models.MizuEntry
|
||||
database.GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
entriesArray := make([]har.Entry, 0)
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
entriesArray = append(entriesArray, harEntry)
|
||||
}
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
return c.Status(fiber.StatusOK).JSON(entriesArray)
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"mizuserver/pkg/up9"
|
||||
)
|
||||
|
||||
var TapStatus shared.TapStatus
|
||||
@@ -10,3 +11,7 @@ var TapStatus shared.TapStatus
|
||||
func GetTappingStatus(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).JSON(TapStatus)
|
||||
}
|
||||
|
||||
func AnalyzeInformation(c *fiber.Ctx) error {
|
||||
return c.Status(fiber.StatusOK).JSON(up9.GetAnalyzeInfo())
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -14,6 +18,24 @@ var (
|
||||
DB = initDataBase(DBPath)
|
||||
)
|
||||
|
||||
const (
|
||||
OrderDesc = "desc"
|
||||
OrderAsc = "asc"
|
||||
LT = "lt"
|
||||
GT = "gt"
|
||||
)
|
||||
|
||||
var (
|
||||
OperatorToSymbolMapping = map[string]string{
|
||||
LT: "<",
|
||||
GT: ">",
|
||||
}
|
||||
OperatorToOrderMapping = map[string]string{
|
||||
LT: OrderDesc,
|
||||
GT: OrderAsc,
|
||||
}
|
||||
)
|
||||
|
||||
func GetEntriesTable() *gorm.DB {
|
||||
return DB.Table("mizu_entries")
|
||||
}
|
||||
@@ -23,3 +45,34 @@ func initDataBase(databasePath string) *gorm.DB {
|
||||
_ = temp.AutoMigrate(&models.MizuEntry{}) // this will ensure table is created
|
||||
return temp
|
||||
}
|
||||
|
||||
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
|
||||
order := OrderDesc
|
||||
var entries []models.MizuEntry
|
||||
GetEntriesTable().
|
||||
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
|
||||
Order(fmt.Sprintf("timestamp %s", order)).
|
||||
Find(&entries)
|
||||
|
||||
if len(entries) > 0 {
|
||||
// the entries always order from oldest to newest so we should revers
|
||||
utils.ReverseSlice(entries)
|
||||
}
|
||||
|
||||
entriesArray := make([]har.Entry, 0)
|
||||
for _, entryData := range entries {
|
||||
var harEntry har.Entry
|
||||
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
|
||||
|
||||
if entryData.ResolvedSource != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: entryData.ResolvedSource})
|
||||
}
|
||||
if entryData.ResolvedDestination != "" {
|
||||
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entryData.ResolvedDestination})
|
||||
}
|
||||
|
||||
entriesArray = append(entriesArray, harEntry)
|
||||
}
|
||||
return entriesArray
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ type MizuEntry struct {
|
||||
Path string `json:"path" gorm:"column:path"`
|
||||
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
||||
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
|
||||
}
|
||||
|
||||
type BaseEntryDetails struct {
|
||||
@@ -34,6 +35,7 @@ type BaseEntryDetails struct {
|
||||
StatusCode int `json:"statusCode,omitempty"`
|
||||
Method string `json:"method,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty"`
|
||||
}
|
||||
|
||||
type EntryData struct {
|
||||
@@ -47,6 +49,10 @@ type EntriesFilter struct {
|
||||
Timestamp int64 `query:"timestamp" validate:"required,min=1"`
|
||||
}
|
||||
|
||||
type UploadEntriesRequestBody struct {
|
||||
Dest string `query:"dest"`
|
||||
}
|
||||
|
||||
type HarFetchRequestBody struct {
|
||||
From int64 `query:"from"`
|
||||
To int64 `query:"to"`
|
||||
@@ -99,5 +105,5 @@ type ExtendedLog struct {
|
||||
|
||||
type ExtendedCreator struct {
|
||||
*har.Creator
|
||||
Source string `json:"_source"`
|
||||
Source *string `json:"_source"`
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
|
||||
}
|
||||
|
||||
func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, error) {
|
||||
@@ -53,9 +53,9 @@ func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil
|
||||
return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
|
||||
}
|
||||
|
||||
func NewFromExisting(clientConfig *restclient.Config, clientSet *kubernetes.Clientset, errOut chan error) *Resolver {
|
||||
return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), errOut: errOut}
|
||||
return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ type Resolver struct {
|
||||
clientConfig *restclient.Config
|
||||
clientSet *kubernetes.Clientset
|
||||
nameMap map[string]string
|
||||
serviceMap map[string]string
|
||||
isStarted bool
|
||||
errOut chan error
|
||||
}
|
||||
@@ -41,6 +42,11 @@ func (resolver *Resolver) Resolve(name string) string {
|
||||
return resolvedName
|
||||
}
|
||||
|
||||
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
|
||||
_, isFound := resolver.serviceMap[address]
|
||||
return isFound
|
||||
}
|
||||
|
||||
func (resolver *Resolver) watchPods(ctx context.Context) error {
|
||||
// empty namespace makes the client watch all namespaces
|
||||
watcher, err := resolver.clientSet.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{Watch: true})
|
||||
@@ -124,6 +130,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
|
||||
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
|
||||
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
|
||||
}
|
||||
if service.Status.LoadBalancer.Ingress != nil {
|
||||
for _, ingress := range service.Status.LoadBalancer.Ingress {
|
||||
@@ -147,6 +154,14 @@ func (resolver *Resolver) saveResolvedName(key string, resolved string, eventTyp
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
delete(resolver.serviceMap, key)
|
||||
} else {
|
||||
resolver.serviceMap[key] = resolved
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) {
|
||||
for {
|
||||
err := fun(ctx)
|
||||
|
||||
@@ -12,7 +12,7 @@ func EntriesRoutes(fiberApp *fiber.App) {
|
||||
routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries)
|
||||
routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry
|
||||
routeGroup.Get("/exportEntries", controllers.GetFullEntries)
|
||||
|
||||
routeGroup.Get("/uploadEntries", controllers.UploadEntries)
|
||||
|
||||
routeGroup.Get("/har", controllers.GetHARs)
|
||||
|
||||
@@ -20,4 +20,5 @@ func EntriesRoutes(fiberApp *fiber.App) {
|
||||
routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
routeGroup.Get("/tapStatus", controllers.GetTappingStatus) // get tapping status
|
||||
routeGroup.Get("/analyzeStatus", controllers.AnalyzeInformation)
|
||||
}
|
||||
|
||||
178
api/pkg/up9/main.go
Normal file
178
api/pkg/up9/main.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package up9
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/zlib"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/database"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
const (
|
||||
AnalyzeCheckSleepTime = 5 * time.Second
|
||||
)
|
||||
|
||||
type GuestToken struct {
|
||||
Token string `json:"token"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
|
||||
type ModelStatus struct {
|
||||
LastMajorGeneration float64 `json:"lastMajorGeneration"`
|
||||
}
|
||||
|
||||
func getGuestToken(url string, target *GuestToken) error {
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(target)
|
||||
}
|
||||
|
||||
func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
|
||||
tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix)
|
||||
token := &GuestToken{}
|
||||
if err := getGuestToken(tokenUrl, token); err != nil {
|
||||
fmt.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func GetRemoteUrl(analyzeDestination string, analyzeToken string) string {
|
||||
return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken)
|
||||
}
|
||||
|
||||
func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string) bool {
|
||||
statusUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s/status", analyzeDestination, analyzeModel))
|
||||
req := &http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: statusUrl,
|
||||
Header: map[string][]string{
|
||||
"Content-Type": {"application/json"},
|
||||
"Guest-Auth": {analyzeToken},
|
||||
},
|
||||
}
|
||||
statusResp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer statusResp.Body.Close()
|
||||
|
||||
target := &ModelStatus{}
|
||||
_ = json.NewDecoder(statusResp.Body).Decode(&target)
|
||||
|
||||
return target.LastMajorGeneration > 0
|
||||
}
|
||||
|
||||
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
|
||||
postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel))
|
||||
return postUrl
|
||||
}
|
||||
|
||||
type AnalyzeInformation struct {
|
||||
IsAnalyzing bool
|
||||
AnalyzedModel string
|
||||
AnalyzeToken string
|
||||
AnalyzeDestination string
|
||||
}
|
||||
|
||||
func (info *AnalyzeInformation) Reset() {
|
||||
info.IsAnalyzing = false
|
||||
info.AnalyzedModel = ""
|
||||
info.AnalyzeToken = ""
|
||||
info.AnalyzeDestination = ""
|
||||
}
|
||||
|
||||
var analyzeInformation = &AnalyzeInformation{}
|
||||
|
||||
func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
||||
return &shared.AnalyzeStatus{
|
||||
IsAnalyzing: analyzeInformation.IsAnalyzing,
|
||||
RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzeToken),
|
||||
IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken),
|
||||
}
|
||||
}
|
||||
|
||||
func UploadEntriesImpl(token string, model string, envPrefix string) {
|
||||
analyzeInformation.IsAnalyzing = true
|
||||
analyzeInformation.AnalyzedModel = model
|
||||
analyzeInformation.AnalyzeToken = token
|
||||
analyzeInformation.AnalyzeDestination = envPrefix
|
||||
|
||||
sleepTime := time.Second * 10
|
||||
|
||||
var timestampFrom int64 = 0
|
||||
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
|
||||
|
||||
if len(entriesArray) > 0 {
|
||||
fmt.Printf("About to upload %v entries\n", len(entriesArray))
|
||||
|
||||
body, jMarshalErr := json.Marshal(entriesArray)
|
||||
if jMarshalErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
fmt.Println("Stopping analyzing")
|
||||
log.Fatal(jMarshalErr)
|
||||
}
|
||||
|
||||
var in bytes.Buffer
|
||||
w := zlib.NewWriter(&in)
|
||||
_, _ = w.Write(body)
|
||||
_ = w.Close()
|
||||
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
|
||||
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: GetTrafficDumpUrl(envPrefix, model),
|
||||
Header: map[string][]string{
|
||||
"Content-Encoding": {"deflate"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
"Guest-Auth": {token},
|
||||
},
|
||||
Body: reqBody,
|
||||
}
|
||||
|
||||
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
log.Println("Stopping analyzing")
|
||||
log.Fatal(postErr)
|
||||
}
|
||||
fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
|
||||
|
||||
} else {
|
||||
fmt.Println("Nothing to upload")
|
||||
}
|
||||
|
||||
fmt.Printf("Sleeping for %v...\n", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
timestampFrom = timestampTo
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateAnalyzeStatus(callback func(data []byte)) {
|
||||
for {
|
||||
if !analyzeInformation.IsAnalyzing {
|
||||
time.Sleep(AnalyzeCheckSleepTime)
|
||||
continue
|
||||
}
|
||||
analyzeStatus := GetAnalyzeInfo()
|
||||
socketMessage := shared.CreateWebSocketMessageTypeAnalyzeStatus(*analyzeStatus)
|
||||
|
||||
jsonMessage, _ := json.Marshal(socketMessage)
|
||||
callback(jsonMessage)
|
||||
time.Sleep(AnalyzeCheckSleepTime)
|
||||
}
|
||||
}
|
||||
@@ -70,14 +70,15 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
|
||||
service = SetHostname(service, entry.ResolvedDestination)
|
||||
}
|
||||
return models.BaseEntryDetails{
|
||||
Id: entry.EntryId,
|
||||
Url: entryUrl,
|
||||
Service: service,
|
||||
Path: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
Id: entry.EntryId,
|
||||
Url: entryUrl,
|
||||
Service: service,
|
||||
Path: entry.Path,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
RequestSenderIp: entry.RequestSenderIp,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,10 +26,10 @@ build-all: ## build for all supported platforms
|
||||
@mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md
|
||||
@$(MAKE) build GOOS=darwin GOARCH=amd64
|
||||
@$(MAKE) build GOOS=linux GOARCH=amd64
|
||||
@# $(MAKE) build GOOS=darwin GOARCH=arm64
|
||||
@# $(MAKE) GOOS=windows GOARCH=amd64
|
||||
@# $(MAKE) GOOS=linux GOARCH=386
|
||||
@# $(MAKE) GOOS=windows GOARCH=386
|
||||
@$(MAKE) GOOS=darwin GOARCH=arm64
|
||||
@# $(MAKE) GOOS=linux GOARCH=arm64
|
||||
@# $(MAKE) GOOS=windows GOARCH=arm64
|
||||
@echo "---------"
|
||||
|
||||
@@ -3,8 +3,10 @@ package cmd
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
@@ -13,20 +15,23 @@ type MizuTapOptions struct {
|
||||
GuiPort uint16
|
||||
Namespace string
|
||||
AllNamespaces bool
|
||||
Analyze bool
|
||||
AnalyzeDestination string
|
||||
KubeConfigPath string
|
||||
MizuImage string
|
||||
MizuPodPort uint16
|
||||
PlainTextFilterRegexes []string
|
||||
TapOutgoing bool
|
||||
}
|
||||
|
||||
|
||||
var mizuTapOptions = &MizuTapOptions{}
|
||||
var direction string
|
||||
|
||||
var tapCmd = &cobra.Command{
|
||||
Use: "tap [POD REGEX]",
|
||||
Short: "Record ingoing traffic of a kubernetes pod",
|
||||
Long: `Record the ingoing traffic of a kubernetes pod.
|
||||
Supported protocols are HTTP and gRPC.`,
|
||||
Supported protocols are HTTP and gRPC.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if len(args) == 0 {
|
||||
return errors.New("POD REGEX argument is required")
|
||||
@@ -39,6 +44,15 @@ var tapCmd = &cobra.Command{
|
||||
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
|
||||
}
|
||||
|
||||
directionLowerCase := strings.ToLower(direction)
|
||||
if directionLowerCase == "any" {
|
||||
mizuTapOptions.TapOutgoing = true
|
||||
} else if directionLowerCase == "in" {
|
||||
mizuTapOptions.TapOutgoing = false
|
||||
} else {
|
||||
return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", direction))
|
||||
}
|
||||
|
||||
RunMizuTap(regex, mizuTapOptions)
|
||||
return nil
|
||||
},
|
||||
@@ -49,9 +63,12 @@ func init() {
|
||||
|
||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
|
||||
tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis (Beta)")
|
||||
tapCmd.Flags().StringVar(&mizuTapOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment")
|
||||
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
|
||||
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
|
||||
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
|
||||
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
||||
}
|
||||
|
||||
@@ -3,13 +3,15 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared"
|
||||
|
||||
core "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/up9inc/mizu/cli/debounce"
|
||||
@@ -45,6 +47,22 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
|
||||
var namespacesStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
|
||||
} else {
|
||||
namespacesStr = "all namespaces"
|
||||
}
|
||||
fmt.Printf("Tapping pods in %s\n", namespacesStr)
|
||||
|
||||
if len(currentlyTappedPods) == 0 {
|
||||
var suggestionStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A"
|
||||
}
|
||||
fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr)
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -60,16 +78,15 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
|
||||
//block until exit signal or error
|
||||
waitForFinish(ctx, cancel)
|
||||
|
||||
// TODO handle incoming traffic from tapper using a channel
|
||||
}
|
||||
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -113,19 +130,27 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
|
||||
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
|
||||
}
|
||||
|
||||
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
||||
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
||||
ctx,
|
||||
mizu.ResourcesNamespace,
|
||||
mizu.TapperDaemonSetName,
|
||||
tappingOptions.MizuImage,
|
||||
mizu.TapperPodName,
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
|
||||
nodeToTappedPodIPMap,
|
||||
mizuServiceAccountExists,
|
||||
); err != nil {
|
||||
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
|
||||
return err
|
||||
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
|
||||
if len(nodeToTappedPodIPMap) > 0 {
|
||||
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
||||
ctx,
|
||||
mizu.ResourcesNamespace,
|
||||
mizu.TapperDaemonSetName,
|
||||
tappingOptions.MizuImage,
|
||||
mizu.TapperPodName,
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
|
||||
nodeToTappedPodIPMap,
|
||||
mizuServiceAccountExists,
|
||||
tappingOptions.TapOutgoing,
|
||||
); err != nil {
|
||||
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -165,7 +190,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
cancel()
|
||||
}
|
||||
|
||||
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
|
||||
fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err)
|
||||
cancel()
|
||||
}
|
||||
@@ -175,10 +200,10 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
for {
|
||||
select {
|
||||
case newTarget := <-added:
|
||||
fmt.Printf("+%s\n", newTarget.Name)
|
||||
fmt.Printf(mizu.Green, fmt.Sprintf("+%s\n", newTarget.Name))
|
||||
|
||||
case removedTarget := <-removed:
|
||||
fmt.Printf("-%s\n", removedTarget.Name)
|
||||
fmt.Printf(mizu.Red, fmt.Sprintf("-%s\n", removedTarget.Name))
|
||||
restartTappersDebouncer.SetOn()
|
||||
|
||||
case modifiedTarget := <-modified:
|
||||
@@ -218,12 +243,21 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
case modifiedPod := <-modified:
|
||||
if modifiedPod.Status.Phase == "Running" && !isPodReady {
|
||||
isPodReady = true
|
||||
var err error
|
||||
portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel)
|
||||
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
|
||||
if err != nil {
|
||||
fmt.Printf("error forwarding port to pod %s\n", err)
|
||||
var portForwardCreateError error
|
||||
if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
|
||||
fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
|
||||
cancel()
|
||||
} else {
|
||||
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished
|
||||
if tappingOptions.Analyze {
|
||||
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -102,7 +102,6 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
|
||||
},
|
||||
DNSPolicy: core.DNSClusterFirstWithHostNet,
|
||||
TerminationGracePeriodSeconds: new(int64),
|
||||
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
|
||||
},
|
||||
}
|
||||
//define the service account only when it exists to prevent pod crash
|
||||
@@ -215,30 +214,117 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string,
|
||||
}
|
||||
|
||||
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
|
||||
if isFound, err := provider.CheckPodExists(ctx, namespace, podName);
|
||||
err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
|
||||
if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName);
|
||||
err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
|
||||
if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName);
|
||||
err != nil {
|
||||
return err
|
||||
} else if !isFound {
|
||||
return nil
|
||||
}
|
||||
|
||||
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error {
|
||||
func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
listOptions := metav1.ListOptions{
|
||||
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
|
||||
Limit: 1,
|
||||
}
|
||||
resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(resourceList.Items) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
listOptions := metav1.ListOptions{
|
||||
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
|
||||
Limit: 1,
|
||||
}
|
||||
resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(resourceList.Items) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
listOptions := metav1.ListOptions{
|
||||
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
|
||||
Limit: 1,
|
||||
}
|
||||
resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(resourceList.Items) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error {
|
||||
if len(nodeToTappedPodIPMap) == 0 {
|
||||
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mizuCmd := []string{
|
||||
"./mizuagent",
|
||||
"-i", "any",
|
||||
"--tap",
|
||||
"--hardump",
|
||||
"--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp),
|
||||
}
|
||||
if tapOutgoing {
|
||||
mizuCmd = append(mizuCmd, "--anydirection")
|
||||
}
|
||||
|
||||
privileged := true
|
||||
agentContainer := applyconfcore.Container()
|
||||
agentContainer.WithName(tapperPodName)
|
||||
agentContainer.WithImage(podImage)
|
||||
agentContainer.WithImagePullPolicy(core.PullAlways)
|
||||
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
|
||||
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
|
||||
agentContainer.WithCommand(mizuCmd...)
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
|
||||
|
||||
@@ -15,3 +15,14 @@ const (
|
||||
TapperPodName = "mizu-tapper"
|
||||
K8sAllNamespaces = ""
|
||||
)
|
||||
|
||||
const (
|
||||
Black = "\033[1;30m%s\033[0m"
|
||||
Red = "\033[1;31m%s\033[0m"
|
||||
Green = "\033[1;32m%s\033[0m"
|
||||
Yellow = "\033[1;33m%s\033[0m"
|
||||
Purple = "\033[1;34m%s\033[0m"
|
||||
Magenta = "\033[1;35m%s\033[0m"
|
||||
Teal = "\033[1;36m%s\033[0m"
|
||||
White = "\033[1;37m%s\033[0m"
|
||||
)
|
||||
|
||||
@@ -1,28 +1,41 @@
|
||||
package shared
|
||||
|
||||
type WebSocketMessageType string
|
||||
|
||||
const (
|
||||
WebSocketMessageTypeEntry WebSocketMessageType = "entry"
|
||||
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
|
||||
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
|
||||
WebSocketMessageTypeEntry WebSocketMessageType = "entry"
|
||||
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
|
||||
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
|
||||
WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus"
|
||||
)
|
||||
|
||||
type WebSocketMessageMetadata struct {
|
||||
MessageType WebSocketMessageType `json:"messageType,omitempty"`
|
||||
}
|
||||
|
||||
type WebSocketAnalyzeStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
AnalyzeStatus AnalyzeStatus `json:"analyzeStatus"`
|
||||
}
|
||||
|
||||
type AnalyzeStatus struct {
|
||||
IsAnalyzing bool `json:"isAnalyzing"`
|
||||
RemoteUrl string `json:"remoteUrl"`
|
||||
IsRemoteReady bool `json:"isRemoteReady"`
|
||||
}
|
||||
|
||||
type WebSocketStatusMessage struct {
|
||||
*WebSocketMessageMetadata
|
||||
TappingStatus TapStatus `json:"tappingStatus"`
|
||||
}
|
||||
|
||||
type TapStatus struct {
|
||||
Pods []PodInfo `json:"pods"`
|
||||
Pods []PodInfo `json:"pods"`
|
||||
}
|
||||
|
||||
type PodInfo struct {
|
||||
Namespace string `json:"namespace"`
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessage {
|
||||
@@ -34,6 +47,15 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag
|
||||
}
|
||||
}
|
||||
|
||||
func CreateWebSocketMessageTypeAnalyzeStatus(analyzeStatus AnalyzeStatus) WebSocketAnalyzeStatusMessage {
|
||||
return WebSocketAnalyzeStatusMessage{
|
||||
WebSocketMessageMetadata: &WebSocketMessageMetadata{
|
||||
MessageType: WebSocketMessageTypeAnalyzeStatus,
|
||||
},
|
||||
AnalyzeStatus: analyzeStatus,
|
||||
}
|
||||
}
|
||||
|
||||
type TrafficFilteringOptions struct {
|
||||
PlainTextMaskingRegexes []*SerializableRegexp
|
||||
}
|
||||
|
||||
@@ -14,18 +14,10 @@ type requestResponsePair struct {
|
||||
Response httpMessage `json:"response"`
|
||||
}
|
||||
|
||||
type ConnectionInfo struct {
|
||||
ClientIP string
|
||||
ClientPort string
|
||||
ServerIP string
|
||||
ServerPort string
|
||||
}
|
||||
|
||||
type httpMessage struct {
|
||||
isRequest bool
|
||||
captureTime time.Time
|
||||
orig interface{}
|
||||
connectionInfo ConnectionInfo
|
||||
}
|
||||
|
||||
|
||||
@@ -44,18 +36,10 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
connectionInfo := &ConnectionInfo{
|
||||
ClientIP: split[0],
|
||||
ClientPort: split[2],
|
||||
ServerIP: split[1],
|
||||
ServerPort: split[3],
|
||||
}
|
||||
|
||||
requestHTTPMessage := httpMessage{
|
||||
isRequest: true,
|
||||
captureTime: captureTime,
|
||||
orig: request,
|
||||
connectionInfo: *connectionInfo,
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.Pop(key); found {
|
||||
|
||||
@@ -24,6 +24,14 @@ type tcpID struct {
|
||||
dstPort string
|
||||
}
|
||||
|
||||
type ConnectionInfo struct {
|
||||
ClientIP string
|
||||
ClientPort string
|
||||
ServerIP string
|
||||
ServerPort string
|
||||
IsOutgoing bool
|
||||
}
|
||||
|
||||
func (tid *tcpID) String() string {
|
||||
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
|
||||
}
|
||||
@@ -38,6 +46,7 @@ type httpReader struct {
|
||||
tcpID tcpID
|
||||
isClient bool
|
||||
isHTTP2 bool
|
||||
isOutgoing bool
|
||||
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
|
||||
data []byte
|
||||
captureTime time.Time
|
||||
@@ -121,13 +130,28 @@ func (h *httpReader) handleHTTP2Stream() error {
|
||||
}
|
||||
|
||||
var reqResPair *requestResponsePair
|
||||
var connectionInfo *ConnectionInfo
|
||||
|
||||
switch messageHTTP1 := messageHTTP1.(type) {
|
||||
case http.Request:
|
||||
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID)
|
||||
connectionInfo = &ConnectionInfo{
|
||||
ClientIP: h.tcpID.srcIP,
|
||||
ClientPort: h.tcpID.srcPort,
|
||||
ServerIP: h.tcpID.dstIP,
|
||||
ServerPort: h.tcpID.dstPort,
|
||||
IsOutgoing: h.isOutgoing,
|
||||
}
|
||||
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime)
|
||||
case http.Response:
|
||||
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID)
|
||||
connectionInfo = &ConnectionInfo{
|
||||
ClientIP: h.tcpID.dstIP,
|
||||
ClientPort: h.tcpID.dstPort,
|
||||
ServerIP: h.tcpID.srcIP,
|
||||
ServerPort: h.tcpID.srcPort,
|
||||
IsOutgoing: h.isOutgoing,
|
||||
}
|
||||
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime)
|
||||
}
|
||||
|
||||
@@ -140,7 +164,7 @@ func (h *httpReader) handleHTTP2Stream() error {
|
||||
reqResPair.Request.captureTime,
|
||||
reqResPair.Response.orig.(*http.Response),
|
||||
reqResPair.Response.captureTime,
|
||||
&reqResPair.Request.connectionInfo,
|
||||
connectionInfo,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -179,7 +203,13 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
|
||||
reqResPair.Request.captureTime,
|
||||
reqResPair.Response.orig.(*http.Response),
|
||||
reqResPair.Response.captureTime,
|
||||
&reqResPair.Request.connectionInfo,
|
||||
&ConnectionInfo{
|
||||
ClientIP: h.tcpID.srcIP,
|
||||
ClientPort: h.tcpID.srcPort,
|
||||
ServerIP: h.tcpID.dstIP,
|
||||
ServerPort: h.tcpID.dstPort,
|
||||
IsOutgoing: h.isOutgoing,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -239,7 +269,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
|
||||
reqResPair.Request.captureTime,
|
||||
reqResPair.Response.orig.(*http.Response),
|
||||
reqResPair.Response.captureTime,
|
||||
&reqResPair.Request.connectionInfo,
|
||||
&ConnectionInfo{
|
||||
ClientIP: h.tcpID.dstIP,
|
||||
ClientPort: h.tcpID.dstPort,
|
||||
ServerIP: h.tcpID.srcIP,
|
||||
ServerPort: h.tcpID.srcPort,
|
||||
IsOutgoing: h.isOutgoing,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,13 +27,15 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
SupportMissingEstablishment: *allowmissinginit,
|
||||
}
|
||||
Debug("Current App Ports: %v", gSettings.filterPorts)
|
||||
srcIp := net.Src().String()
|
||||
dstIp := net.Dst().String()
|
||||
dstPort := int(tcp.DstPort)
|
||||
|
||||
if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
|
||||
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort)
|
||||
}
|
||||
isHTTP := factory.shouldTap(dstIp, dstPort)
|
||||
props := factory.getStreamProps(srcIp, dstIp, dstPort)
|
||||
isHTTP := props.isTapTarget
|
||||
stream := &tcpStream{
|
||||
net: net,
|
||||
transport: transport,
|
||||
@@ -57,6 +59,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
hexdump: *hexdump,
|
||||
parent: stream,
|
||||
isClient: true,
|
||||
isOutgoing: props.isOutgoing,
|
||||
harWriter: factory.harWriter,
|
||||
}
|
||||
stream.server = httpReader{
|
||||
@@ -70,6 +73,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
},
|
||||
hexdump: *hexdump,
|
||||
parent: stream,
|
||||
isOutgoing: props.isOutgoing,
|
||||
harWriter: factory.harWriter,
|
||||
}
|
||||
factory.wg.Add(2)
|
||||
@@ -84,28 +88,29 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
|
||||
factory.wg.Wait()
|
||||
}
|
||||
|
||||
func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool {
|
||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps {
|
||||
if hostMode {
|
||||
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
|
||||
return true
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
|
||||
return true
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true {
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
}
|
||||
return false
|
||||
return &streamProps{isTapTarget: false}
|
||||
} else {
|
||||
isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
|
||||
if !isTappedPort {
|
||||
return false
|
||||
return &streamProps{isTapTarget: false, isOutgoing: false}
|
||||
}
|
||||
|
||||
if !*anydirection {
|
||||
isDirectedHere := inArrayString(ownIps, dstIP)
|
||||
if !isDirectedHere {
|
||||
return false
|
||||
}
|
||||
isOutgoing := !inArrayString(ownIps, dstIP)
|
||||
|
||||
if !*anydirection && isOutgoing {
|
||||
return &streamProps{isTapTarget: false, isOutgoing: isOutgoing}
|
||||
}
|
||||
|
||||
return true
|
||||
return &streamProps{isTapTarget: true}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,3 +121,9 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
type streamProps struct {
|
||||
isTapTarget bool
|
||||
isOutgoing bool
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
display: flex
|
||||
align-items: center
|
||||
padding-left: 24px
|
||||
padding-right: 24px
|
||||
justify-content: space-between
|
||||
|
||||
.title
|
||||
font-size: 45px
|
||||
|
||||
@@ -1,18 +1,41 @@
|
||||
import React from 'react';
|
||||
import {HarPage} from "./components/HarPage";
|
||||
import React, {useState} from 'react';
|
||||
import './App.sass';
|
||||
import logo from './components/assets/Mizu.svg';
|
||||
import {Button} from "@material-ui/core";
|
||||
import {HarPage} from "./components/HarPage";
|
||||
|
||||
|
||||
const App = () => {
|
||||
return (
|
||||
<div className="mizuApp">
|
||||
<div className="header">
|
||||
<div className="title"><img src={logo} alt="logo"/></div>
|
||||
<div className="description">Traffic viewer for Kubernetes</div>
|
||||
|
||||
const [analyzeStatus, setAnalyzeStatus] = useState(null);
|
||||
|
||||
return (
|
||||
<div className="mizuApp">
|
||||
<div className="header">
|
||||
<div style={{display: "flex", alignItems: "center"}}>
|
||||
<div className="title"><img src={logo} alt="logo"/></div>
|
||||
<div className="description">Traffic viewer for Kubernetes</div>
|
||||
</div>
|
||||
<div>
|
||||
{analyzeStatus?.isAnalyzing &&
|
||||
<div
|
||||
title={!analyzeStatus?.isRemoteReady ? "Analysis is not ready yet" : "Go To see further analysis"}>
|
||||
<Button
|
||||
variant="contained"
|
||||
color="primary"
|
||||
disabled={!analyzeStatus?.isRemoteReady}
|
||||
onClick={() => {
|
||||
window.open(analyzeStatus?.remoteUrl)
|
||||
}}>
|
||||
Analysis
|
||||
</Button>
|
||||
</div>
|
||||
}
|
||||
</div>
|
||||
</div>
|
||||
<HarPage setAnalyzeStatus={setAnalyzeStatus}/>
|
||||
</div>
|
||||
<HarPage/>
|
||||
</div>
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
export default App;
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import React from "react";
|
||||
import styles from './style/HarEntry.module.sass';
|
||||
import StatusCode from "./StatusCode";
|
||||
import StatusCode, {getClassification, StatusCodeClassification} from "./StatusCode";
|
||||
import {EndpointPath} from "./EndpointPath";
|
||||
import ingoingIconSuccess from "./assets/ingoing-traffic-success.svg"
|
||||
import ingoingIconFailure from "./assets/ingoing-traffic-failure.svg"
|
||||
import ingoingIconNeutral from "./assets/ingoing-traffic-neutral.svg"
|
||||
import outgoingIconSuccess from "./assets/outgoing-traffic-success.svg"
|
||||
import outgoingIconFailure from "./assets/outgoing-traffic-failure.svg"
|
||||
import outgoingIconNeutral from "./assets/outgoing-traffic-neutral.svg"
|
||||
|
||||
interface HAREntry {
|
||||
method?: string,
|
||||
@@ -12,6 +18,7 @@ interface HAREntry {
|
||||
url?: string;
|
||||
isCurrentRevision?: boolean;
|
||||
timestamp: Date;
|
||||
isOutgoing?: boolean;
|
||||
}
|
||||
|
||||
interface HAREntryProps {
|
||||
@@ -21,6 +28,26 @@ interface HAREntryProps {
|
||||
}
|
||||
|
||||
export const HarEntry: React.FC<HAREntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
|
||||
const classification = getClassification(entry.statusCode)
|
||||
let ingoingIcon;
|
||||
let outgoingIcon;
|
||||
switch(classification) {
|
||||
case StatusCodeClassification.SUCCESS: {
|
||||
ingoingIcon = ingoingIconSuccess;
|
||||
outgoingIcon = outgoingIconSuccess;
|
||||
break;
|
||||
}
|
||||
case StatusCodeClassification.FAILURE: {
|
||||
ingoingIcon = ingoingIconFailure;
|
||||
outgoingIcon = outgoingIconFailure;
|
||||
break;
|
||||
}
|
||||
case StatusCodeClassification.NEUTRAL: {
|
||||
ingoingIcon = ingoingIconNeutral;
|
||||
outgoingIcon = outgoingIconNeutral;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return <>
|
||||
<div id={entry.id} className={`${styles.row} ${isSelected ? styles.rowSelected : ''}`} onClick={() => setFocusedEntryId(entry.id)}>
|
||||
@@ -33,7 +60,14 @@ export const HarEntry: React.FC<HAREntryProps> = ({entry, setFocusedEntryId, isS
|
||||
{entry.service}
|
||||
</div>
|
||||
</div>
|
||||
<div className={styles.directionContainer}>
|
||||
{entry.isOutgoing ?
|
||||
<img src={outgoingIcon} alt="outgoing traffic" title="outgoing"/>
|
||||
:
|
||||
<img src={ingoingIcon} alt="ingoing traffic" title="ingoing"/>
|
||||
}
|
||||
</div>
|
||||
<div className={styles.timestamp}>{new Date(+entry.timestamp)?.toLocaleString()}</div>
|
||||
</div>
|
||||
</>
|
||||
};
|
||||
};
|
||||
|
||||
@@ -35,7 +35,11 @@ enum ConnectionStatus {
|
||||
Paused
|
||||
}
|
||||
|
||||
export const HarPage: React.FC = () => {
|
||||
interface HarPageProps {
|
||||
setAnalyzeStatus: (status: any) => void;
|
||||
}
|
||||
|
||||
export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
|
||||
|
||||
const classes = useLayoutStyles();
|
||||
|
||||
@@ -60,21 +64,21 @@ export const HarPage: React.FC = () => {
|
||||
ws.current.onclose = () => setConnection(ConnectionStatus.Closed);
|
||||
}
|
||||
|
||||
if(ws.current) {
|
||||
if (ws.current) {
|
||||
ws.current.onmessage = e => {
|
||||
if(!e?.data) return;
|
||||
if (!e?.data) return;
|
||||
const message = JSON.parse(e.data);
|
||||
|
||||
switch (message.messageType) {
|
||||
case "entry":
|
||||
const entry = message.data
|
||||
if(connection === ConnectionStatus.Paused) {
|
||||
if (connection === ConnectionStatus.Paused) {
|
||||
setNoMoreDataBottom(false)
|
||||
return;
|
||||
}
|
||||
if(!focusedEntryId) setFocusedEntryId(entry.id)
|
||||
if (!focusedEntryId) setFocusedEntryId(entry.id)
|
||||
let newEntries = [...entries];
|
||||
if(entries.length === 1000) {
|
||||
if (entries.length === 1000) {
|
||||
newEntries = newEntries.splice(1);
|
||||
setNoMoreDataTop(false);
|
||||
}
|
||||
@@ -83,6 +87,9 @@ export const HarPage: React.FC = () => {
|
||||
case "status":
|
||||
setTappingStatus(message.tappingStatus);
|
||||
break
|
||||
case "analyzeStatus":
|
||||
setAnalyzeStatus(message.analyzeStatus);
|
||||
break
|
||||
default:
|
||||
console.error(`unsupported websocket message type, Got: ${message.messageType}`)
|
||||
}
|
||||
@@ -94,19 +101,23 @@ export const HarPage: React.FC = () => {
|
||||
fetch(`http://localhost:8899/api/tapStatus`)
|
||||
.then(response => response.json())
|
||||
.then(data => setTappingStatus(data));
|
||||
|
||||
fetch(`http://localhost:8899/api/analyzeStatus`)
|
||||
.then(response => response.json())
|
||||
.then(data => setAnalyzeStatus(data));
|
||||
}, []);
|
||||
|
||||
|
||||
useEffect(() => {
|
||||
if(!focusedEntryId) return;
|
||||
if (!focusedEntryId) return;
|
||||
setSelectedHarEntry(null)
|
||||
fetch(`http://localhost:8899/api/entries/${focusedEntryId}`)
|
||||
.then(response => response.json())
|
||||
.then(data => setSelectedHarEntry(data));
|
||||
},[focusedEntryId])
|
||||
}, [focusedEntryId])
|
||||
|
||||
const toggleConnection = () => {
|
||||
setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected );
|
||||
setConnection(connection === ConnectionStatus.Connected ? ConnectionStatus.Paused : ConnectionStatus.Connected);
|
||||
}
|
||||
|
||||
const getConnectionStatusClass = (isContainer) => {
|
||||
@@ -135,11 +146,12 @@ export const HarPage: React.FC = () => {
|
||||
return (
|
||||
<div className="HarPage">
|
||||
<div className="harPageHeader">
|
||||
<img style={{cursor: "pointer", marginRight: 15, height: 30}} alt="pause" src={connection === ConnectionStatus.Connected ? pauseIcon : playIcon} onClick={toggleConnection}/>
|
||||
<img style={{cursor: "pointer", marginRight: 15, height: 30}} alt="pause"
|
||||
src={connection === ConnectionStatus.Connected ? pauseIcon : playIcon} onClick={toggleConnection}/>
|
||||
<div className="connectionText">
|
||||
{getConnectionTitle()}
|
||||
<div className={"indicatorContainer " + getConnectionStatusClass(true)}>
|
||||
<div className={"indicator " + getConnectionStatusClass(false)} />
|
||||
<div className={"indicator " + getConnectionStatusClass(false)}/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -169,7 +181,8 @@ export const HarPage: React.FC = () => {
|
||||
</div>
|
||||
</div>
|
||||
<div className={classes.details}>
|
||||
{selectedHarEntry && <HAREntryDetailed harEntry={selectedHarEntry} classes={{root: classes.harViewer}}/>}
|
||||
{selectedHarEntry &&
|
||||
<HAREntryDetailed harEntry={selectedHarEntry} classes={{root: classes.harViewer}}/>}
|
||||
</div>
|
||||
</div>}
|
||||
{tappingStatus?.pods != null && <StatusBar tappingStatus={tappingStatus}/>}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import React from "react";
|
||||
import styles from './style/StatusCode.module.sass';
|
||||
|
||||
enum StatusCodeClassification {
|
||||
export enum StatusCodeClassification {
|
||||
SUCCESS = "success",
|
||||
FAILURE = "failure",
|
||||
NEUTRAL = "neutral"
|
||||
@@ -14,6 +14,12 @@ interface HAREntryProps {
|
||||
|
||||
const StatusCode: React.FC<HAREntryProps> = ({statusCode}) => {
|
||||
|
||||
const classification = getClassification(statusCode)
|
||||
|
||||
return <span className={`${styles[classification]} ${styles.base}`}>{statusCode}</span>
|
||||
};
|
||||
|
||||
export function getClassification(statusCode: number): string {
|
||||
let classification = StatusCodeClassification.NEUTRAL;
|
||||
|
||||
if (statusCode >= 200 && statusCode <= 399) {
|
||||
@@ -22,7 +28,7 @@ const StatusCode: React.FC<HAREntryProps> = ({statusCode}) => {
|
||||
classification = StatusCodeClassification.FAILURE;
|
||||
}
|
||||
|
||||
return <span className={`${styles[classification]} ${styles.base}`}>{statusCode}</span>
|
||||
};
|
||||
return classification
|
||||
}
|
||||
|
||||
export default StatusCode;
|
||||
export default StatusCode;
|
||||
|
||||
5
ui/src/components/assets/ingoing-traffic-failure.svg
Normal file
5
ui/src/components/assets/ingoing-traffic-failure.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
|
||||
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="#EB5757"/>
|
||||
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="#EB5757"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 800 B |
5
ui/src/components/assets/ingoing-traffic-neutral.svg
Normal file
5
ui/src/components/assets/ingoing-traffic-neutral.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
|
||||
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="gray"/>
|
||||
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="gray"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 794 B |
5
ui/src/components/assets/ingoing-traffic-success.svg
Normal file
5
ui/src/components/assets/ingoing-traffic-success.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
|
||||
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="#27AE60"/>
|
||||
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="#27AE60"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 800 B |
5
ui/src/components/assets/outgoing-traffic-failure.svg
Normal file
5
ui/src/components/assets/outgoing-traffic-failure.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
|
||||
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="#EB5757"/>
|
||||
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="#EB5757"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 736 B |
5
ui/src/components/assets/outgoing-traffic-neutral.svg
Normal file
5
ui/src/components/assets/outgoing-traffic-neutral.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
|
||||
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="gray"/>
|
||||
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="gray"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 730 B |
5
ui/src/components/assets/outgoing-traffic-success.svg
Normal file
5
ui/src/components/assets/outgoing-traffic-success.svg
Normal file
@@ -0,0 +1,5 @@
|
||||
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
|
||||
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
|
||||
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="#27AE60"/>
|
||||
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="#27AE60"/>
|
||||
</svg>
|
||||
|
After Width: | Height: | Size: 736 B |
@@ -37,9 +37,10 @@
|
||||
.timestamp
|
||||
font-size: 12px
|
||||
color: $secondary-font-color
|
||||
padding-left: 8px
|
||||
padding-right: 8px
|
||||
padding-left: 12px
|
||||
flex-shrink: 0
|
||||
width: 145px
|
||||
text-align: left
|
||||
|
||||
.endpointServiceContainer
|
||||
display: flex
|
||||
@@ -47,4 +48,10 @@
|
||||
overflow: hidden
|
||||
padding-right: 10px
|
||||
padding-left: 10px
|
||||
flex-grow: 1
|
||||
flex-grow: 1
|
||||
|
||||
.directionContainer
|
||||
display: flex
|
||||
border-right: 1px solid $data-background-color
|
||||
padding: 4px
|
||||
padding-right: 12px
|
||||
|
||||
@@ -19,4 +19,4 @@ $blue-gray: #494677;
|
||||
successColor: $success-color;
|
||||
failureColor: $failure-color;
|
||||
blueGray: $blue-gray;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user