Compare commits

...

3 Commits

Author SHA1 Message Date
Igor Gov
793bb97e51 Print Mizu URL only after the watched pods (#156)
* Print Mizu URL after the watched pods
2021-08-03 08:54:10 +03:00
RamiBerm
ceb8d714e3 TRA-3420 inform user of tls traffic (#149)
* Update passive_tapper.go and tls_utils.go

* Update go.mod, go.sum, and 18 more files...

* go fmt

* Update http_reader.go, passive_tapper.go, and 3 more files...

* Update status_controller.go and status_provider.go

Co-authored-by: RamiBerm <rami.berman@up9.com>
2021-08-01 14:57:43 +03:00
gadotroee
8d8310ee02 Revert "feature/TRA_3427_demo_mode (#150)" (#151)
This reverts commit 71eff5ea04.
2021-07-29 21:06:41 +03:00
25 changed files with 23102 additions and 416 deletions

View File

@@ -22,6 +22,7 @@ require (
k8s.io/api v0.21.0
k8s.io/apimachinery v0.21.0
k8s.io/client-go v0.21.0
github.com/patrickmn/go-cache v2.1.0+incompatible
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared

View File

@@ -45,6 +45,8 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
@@ -245,6 +247,8 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@@ -4,6 +4,12 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/api"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
@@ -13,17 +19,9 @@ import (
"os"
"os/signal"
"strings"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
)
var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API")
var demo = flag.Bool("demo", false, "Run in Demo mode with API")
var apiServer = flag.Bool("api-server", false, "Run in API server mode with API")
var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode")
var apiServerAddress = flag.String("api-server-address", "", "Address of mizu API server")
@@ -36,12 +34,13 @@ func main() {
if !*shouldTap && !*apiServer && !*standalone {
panic("One of the flags --tap, --api or --standalone must be provided")
}
if *standalone {
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil, false)
go api.StartReadingEntries(filteredHarChannel, nil)
go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
@@ -63,19 +62,14 @@ func main() {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
go pipeChannelToSocket(socketConnection, harOutputChannel)
go api.StartReadingOutbound(outboundLinkOutputChannel)
go pipeTapChannelToSocket(socketConnection, harOutputChannel)
go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
} else if *apiServer {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
if *demo {
workdir := "./hars"
go api.StartReadingEntries(filteredHarChannel, &workdir, true)
} else {
go api.StartReadingEntries(filteredHarChannel, nil, false)
}
go api.StartReadingEntries(filteredHarChannel, nil)
hostApi(socketHarOutChannel)
}
@@ -183,7 +177,7 @@ func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool {
return false
}
func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")
}
@@ -206,3 +200,21 @@ func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *
}
}
}
func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
for outboundLink := range outboundLinkChannel {
if outboundLink.SuggestedProtocol == tap.TLSProtocol {
marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
if err != nil {
rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
continue
}
}
}
}

View File

@@ -5,7 +5,10 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/holder"
"net/url"
"os"
@@ -14,11 +17,6 @@ import (
"strings"
"time"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/resolver"
@@ -49,15 +47,15 @@ func init() {
holder.SetResolver(res)
}
func StartReadingEntries(harChannel <-chan *tap.OutputChannelItem, workingDir *string, demo bool) {
func StartReadingEntries(harChannel <-chan *tap.OutputChannelItem, workingDir *string) {
if workingDir != nil && *workingDir != "" {
startReadingFiles(*workingDir, demo)
startReadingFiles(*workingDir)
} else {
startReadingChannel(harChannel)
}
}
func startReadingFiles(workingDir string, infiniteLoad bool) {
func startReadingFiles(workingDir string) {
err := os.MkdirAll(workingDir, os.ModePerm)
utils.CheckErr(err)
@@ -88,23 +86,18 @@ func startReadingFiles(workingDir string, infiniteLoad bool) {
utils.CheckErr(decErr)
for _, entry := range inputHar.Log.Entries {
if infiniteLoad {
entry.StartedDateTime = time.Now().Add(20 * time.Millisecond)
}
time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
time.Sleep(time.Millisecond * 250)
connectionInfo := &tap.ConnectionInfo{
ClientIP: fileInfo.Name(),
ClientIP: fileInfo.Name(),
ClientPort: "",
ServerIP: "",
ServerIP: "",
ServerPort: "",
IsOutgoing: false,
}
saveHarToDb(entry, connectionInfo)
}
if !infiniteLoad {
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
}
@@ -125,6 +118,7 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
}
}
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
@@ -202,5 +196,6 @@ func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int {
sizeBytes += 8 // SizeBytes bytes
sizeBytes += 1 // IsOutgoing bytes
return sizeBytes
}

View File

@@ -6,8 +6,8 @@ import (
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/controllers"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/routes"
"mizuserver/pkg/up9"
"sync"
@@ -80,15 +80,46 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
controllers.TapStatus = statusMessage.TappingStatus
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
broadcastToBrowserClients(message)
}
case shared.WebsocketMessageTypeOutboundLink:
var outboundLinkMessage models.WebsocketOutboundLinkMessage
err := json.Unmarshal(message, &outboundLinkMessage)
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
handleTLSLink(outboundLinkMessage)
}
default:
rlog.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
}
}
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}
cacheKey := fmt.Sprintf("%s -> %s:%d", outboundLinkMessage.Data.Src, outboundLinkMessage.Data.DstIP, outboundLinkMessage.Data.DstPort)
_, isInCache := providers.RecentTLSLinks.Get(cacheKey)
if isInCache {
return
} else {
providers.RecentTLSLinks.SetDefault(cacheKey, outboundLinkMessage.Data)
}
marshaledMessage, err := json.Marshal(outboundLinkMessage)
if err != nil {
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
} else {
fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage))
broadcastToBrowserClients(marshaledMessage)
}
}
func removeSocketUUIDFromBrowserSlice(uuidToRemove int) {
newUUIDSlice := make([]int, 0, len(browserClientSocketUUIDs))
for _, uuid := range browserClientSocketUUIDs {

View File

@@ -2,17 +2,19 @@ package controllers
import (
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"net/http"
)
var TapStatus shared.TapStatus
func GetTappingStatus(c *gin.Context) {
c.JSON(http.StatusOK, TapStatus)
c.JSON(http.StatusOK, providers.TapStatus)
}
func AnalyzeInformation(c *gin.Context) {
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
}
func GetRecentTLSLinks(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
}

View File

@@ -132,6 +132,11 @@ type WebSocketTappedEntryMessage struct {
Data *tap.OutputChannelItem
}
type WebsocketOutboundLinkMessage struct {
*shared.WebSocketMessageMetadata
Data *tap.OutboundLink
}
func CreateBaseEntryWebSocketMessage(base *BaseEntryDetails) ([]byte, error) {
message := &WebSocketEntryMessage{
WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
@@ -152,6 +157,16 @@ func CreateWebsocketTappedEntryMessage(base *tap.OutputChannelItem) ([]byte, err
return json.Marshal(message)
}
func CreateWebsocketOutboundLinkMessage(base *tap.OutboundLink) ([]byte, error) {
message := &WebsocketOutboundLinkMessage{
WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
MessageType: shared.WebsocketMessageTypeOutboundLink,
},
Data: base,
}
return json.Marshal(message)
}
// ExtendedHAR is the top level object of a HAR log.
type ExtendedHAR struct {
Log *ExtendedLog `json:"log"`

View File

@@ -0,0 +1,28 @@
package providers
import (
"github.com/patrickmn/go-cache"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"time"
)
const tlsLinkRetainmentTime = time.Minute * 15
var (
TapStatus shared.TapStatus
RecentTLSLinks = cache.New(tlsLinkRetainmentTime, tlsLinkRetainmentTime)
)
func GetAllRecentTLSAddresses() []string {
recentTLSLinks := make([]string, 0)
for _, outboundLinkItem := range RecentTLSLinks.Items() {
outboundLink, castOk := outboundLinkItem.Object.(*tap.OutboundLink)
if castOk {
recentTLSLinks = append(recentTLSLinks, outboundLink.DstIP)
}
}
return recentTLSLinks
}

View File

@@ -22,4 +22,5 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/tapStatus", controllers.GetTappingStatus) // get tapping status
routeGroup.GET("/analyzeStatus", controllers.AnalyzeInformation)
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
}

View File

@@ -1,32 +0,0 @@
package cmd
import (
"github.com/spf13/cobra"
)
type MizuDemoOptions struct {
GuiPort uint16
Analyze bool
AnalyzeDestination string
}
var mizuDemoOptions = &MizuDemoOptions{}
var demoCmd = &cobra.Command{
Use: "demo",
Short: "Record ingoing traffic of a kubernetes pod",
Long: `Record the ingoing traffic of a kubernetes pod.
Supported protocols are HTTP and gRPC.`,
RunE: func(cmd *cobra.Command, args []string) error {
RunMizuTapDemo(mizuDemoOptions)
return nil
},
}
func init() {
rootCmd.AddCommand(demoCmd)
demoCmd.Flags().Uint16VarP(&mizuDemoOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
demoCmd.Flags().BoolVar(&mizuDemoOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis (Beta)")
demoCmd.Flags().StringVar(&mizuDemoOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment")
}

View File

@@ -1,185 +0,0 @@
package cmd
import (
"archive/zip"
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"strings"
"syscall"
"time"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuTapDemo(demoOptions *MizuDemoOptions) {
dir, _ := os.Getwd()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
downloadMizuDemo(dir)
go callMizuDemo(ctx, cancel, dir, demoOptions)
if demoOptions.Analyze {
go analyze(demoOptions)
fmt.Printf(uiUtils.Purple, "mizu tap \"catalogue-.*|carts-[0-9].*|payment.*|shipping.*|user-[0-9].*\" -n sock-shop --analyze\n")
} else {
fmt.Printf(uiUtils.Purple, "mizu tap \"catalogue-.*|carts-[0-9].*|payment.*|shipping.*|user-[0-9].*\" -n sock-shop\n")
}
fmt.Println("Mizu will be available on http://localhost:8899 in a few seconds")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
select {
case <-ctx.Done():
break
case <-sigChan:
cleanUpDemoResources(dir)
cancel()
}
}
func cleanUpDemoResources(dir string) {
removeFile(fmt.Sprintf("%s/site.zip", dir))
removeFile(fmt.Sprintf("%s/site", dir))
removeFile(fmt.Sprintf("%s/apiserver.zip", dir))
removeFile(fmt.Sprintf("%s/apiserver", dir))
removeFile(fmt.Sprintf("%s/entries.db", dir))
removeFile(fmt.Sprintf("%s/hars", dir))
removeFile(fmt.Sprintf("%s/hars.zip", dir))
}
func removeFile(file string) {
err := os.RemoveAll(file)
if err != nil {
log.Fatal(err)
}
}
func downloadMizuDemo(dir string) {
if runtime.GOOS != "darwin" && runtime.GOOS != "linux" {
panic("Platform not supported")
}
mizuApiURL := fmt.Sprintf("https://storage.googleapis.com/up9-mizu-demo-mode/apiserver-%s.zip", runtime.GOOS)
siteFileURL := "https://storage.googleapis.com/up9-mizu-demo-mode/site.zip"
harsURL := "https://storage.googleapis.com/up9-mizu-demo-mode/hars.zip"
dirApi := fmt.Sprintf("%s/apiserver.zip", dir)
dirSite := fmt.Sprintf("%s/site.zip", dir)
dirHars := fmt.Sprintf("%s/hars.zip", dir)
DownloadFile(dirApi, mizuApiURL)
DownloadFile(dirSite, siteFileURL)
DownloadFile(dirHars, harsURL)
UnzipSite(dirSite, fmt.Sprintf("%s/", dir))
UnzipSite(dirApi, fmt.Sprintf("%s/", dir))
UnzipSite(dirHars, fmt.Sprintf("%s/", dir))
allowExecutable(fmt.Sprintf("%s/apiserver", dir))
}
func DownloadFile(filepath string, url string) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
out, err := os.Create(filepath)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
return err
}
func UnzipSite(src string, dest string) ([]string, error) {
var filenames []string
r, err := zip.OpenReader(src)
if err != nil {
return filenames, err
}
defer r.Close()
for _, f := range r.File {
fpath := filepath.Join(dest, f.Name)
if !strings.HasPrefix(fpath, filepath.Clean(dest)+string(os.PathSeparator)) {
return filenames, fmt.Errorf("%s: illegal file path", fpath)
}
filenames = append(filenames, fpath)
if f.FileInfo().IsDir() {
os.MkdirAll(fpath, os.ModePerm)
continue
}
if err = os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
return filenames, err
}
outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return filenames, err
}
rc, err := f.Open()
if err != nil {
return filenames, err
}
_, err = io.Copy(outFile, rc)
outFile.Close()
rc.Close()
if err != nil {
return filenames, err
}
}
return filenames, nil
}
func allowExecutable(dir string) {
if err := os.Chmod(dir, 0755); err != nil {
log.Fatalln(err)
}
}
func callMizuDemo(ctx context.Context, cancel context.CancelFunc, dir string, demoOptions *MizuDemoOptions) {
cmd := exec.Command(fmt.Sprintf("%s/apiserver", dir), "--aggregator", "--demo")
var out bytes.Buffer
// set the output to our variable
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
log.Println(err)
}
}
func analyze(demoOptions *MizuDemoOptions) {
mizuProxiedUrl := getMizuCollectorProxiedHostAndPath(demoOptions.GuiPort)
for {
response, err := http.Get(fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=10", mizuProxiedUrl, demoOptions.AnalyzeDestination))
if err != nil || response.StatusCode != 200 {
fmt.Printf(uiUtils.Red, "Mizu Not running, waiting 10 seconds before trying again\n")
} else {
fmt.Printf(uiUtils.Purple, "Traffic is uploading to UP9 cloud for further analsys\n")
break
}
time.Sleep(10 * time.Second)
}
}
func getMizuCollectorProxiedHostAndPath(mizuPort uint16) string {
return fmt.Sprintf("localhost:%d", mizuPort)
}

View File

@@ -86,9 +86,10 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
return
}
urlReadyChan := make(chan string)
mizu.CheckNewerVersion()
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions, urlReadyChan)
go syncApiStatus(ctx, cancel, tappingOptions)
//block until exit signal or error
@@ -231,7 +232,7 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
@@ -256,16 +257,20 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
timer := time.AfterFunc(time.Second*10, func() {
mizu.Log.Debugf("Waiting for URL...")
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
})
for {
select {
case newTarget := <-added:
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", newTarget.Name))
timer.Reset(time.Second * 2)
case removedTarget := <-removed:
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedTarget.Name))
timer.Reset(time.Second * 2)
restartTappersDebouncer.SetOn()
case modifiedTarget := <-modified:
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -287,7 +292,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
isPodReady := false
@@ -309,33 +314,18 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
go func() {
err := kubernetes.StartProxy(kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
mizu.Log.Infof("Error occured while running k8s proxy %v", err)
mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
cancel()
}
}()
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
mizu.Log.Infof("Mizu is available at http://%s", mizuProxiedUrl)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
if tappingOptions.Analysis {
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalysisDestination), tappingOptions.SleepIntervalSec)
u, err := url.ParseRequestURI(urlPath)
if err != nil {
log.Fatal(fmt.Sprintf("Failed parsing the URL %v\n", err))
}
mizu.Log.Debugf("Sending get request to %v", u.String())
if response, err := http.Get(u.String()); err != nil || response.StatusCode != 200 {
mizu.Log.Infof("error sending upload entries req, status code: %v, err: %v", response.StatusCode, err)
} else {
mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
}
}
}
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis(tappingOptions)
case <-timeAfter:
if !isPodReady {
mizu.Log.Infof("error: %s pod was not ready in time", mizu.ApiServerPodName)
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
cancel()
}
@@ -345,6 +335,25 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
func requestForAnalysis(tappingOptions *MizuTapOptions) {
if !tappingOptions.Analysis {
return
}
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(tappingOptions.GuiPort)
urlPath := fmt.Sprintf("http://%s/api/uploadEntries?dest=%s&interval=%v", mizuProxiedUrl, url.QueryEscape(tappingOptions.AnalysisDestination), tappingOptions.SleepIntervalSec)
u, err := url.ParseRequestURI(urlPath)
if err != nil {
log.Fatal(fmt.Sprintf("Failed parsing the URL %v\n", err))
}
mizu.Log.Debugf("Sending get request to %v", u.String())
if response, err := http.Get(u.String()); err != nil || response.StatusCode != 200 {
mizu.Log.Infof("error sending upload entries req, status code: %v, err: %v", response.StatusCode, err)
} else {
mizu.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
}
}
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
mizuRBACExists, err := kubernetesProvider.DoesServiceAccountExist(ctx, mizu.ResourcesNamespace, mizu.ServiceAccountName)
if err != nil {
@@ -407,7 +416,6 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
time.Sleep(10 * time.Second)
}
}
}
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {

View File

@@ -2,6 +2,7 @@ package kubernetes
import (
"fmt"
"github.com/up9inc/mizu/cli/mizu"
"k8s.io/kubectl/pkg/proxy"
"net"
"net/http"
@@ -13,6 +14,7 @@ const k8sProxyApiPrefix = "/"
const mizuServicePort = 80
func StartProxy(kubernetesProvider *Provider, mizuPort uint16, mizuNamespace string, mizuServiceName string) error {
mizu.Log.Debugf("Starting proxy. namespace: [%v], service name: [%s], port: [%v]", mizuNamespace, mizuServiceName, mizuPort)
filter := &proxy.FilterServer{
AcceptPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathAcceptRE),
RejectPaths: proxy.MakeRegexpArrayOrDie(proxy.DefaultPathRejectRE),

View File

@@ -55,7 +55,7 @@ func CheckNewerVersion() {
client := github.NewClient(nil)
latestRelease, _, err := client.Repositories.GetLatestRelease(context.Background(), "up9inc", "mizu")
if err != nil {
Log.Debugf("Failed to get latest release")
Log.Debugf("[ERROR] Failed to get latest release")
return
}
@@ -67,20 +67,20 @@ func CheckNewerVersion() {
}
}
if versionFileUrl == "" {
Log.Debugf("Version file not found in the latest release")
Log.Debugf("[ERROR] Version file not found in the latest release")
return
}
res, err := http.Get(versionFileUrl)
if err != nil {
Log.Debugf("http.Get version asset -> %v", err)
Log.Debugf("[ERROR] Failed to get the version file %v", err)
return
}
data, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
Log.Debugf("ioutil.ReadAll -> %v", err)
Log.Debugf("[ERROR] Failed to read the version file -> %v", err)
return
}
gitHubVersion := string(data)

View File

@@ -7,6 +7,7 @@ const (
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
WebSocketMessageTypeAnalyzeStatus WebSocketMessageType = "analyzeStatus"
WebsocketMessageTypeOutboundLink WebSocketMessageType = "outboundLink"
)
type WebSocketMessageMetadata struct {
@@ -31,7 +32,8 @@ type WebSocketStatusMessage struct {
}
type TapStatus struct {
Pods []PodInfo `json:"pods"`
Pods []PodInfo `json:"pods"`
TLSLinks []TLSLinkInfo `json:"tlsLinks"`
}
type PodInfo struct {
@@ -39,6 +41,13 @@ type PodInfo struct {
Name string `json:"name"`
}
type TLSLinkInfo struct {
SourceIP string `json:"sourceIP"`
DestinationAddress string `json:"destinationAddress"`
ResolvedDestinationName string `json:"resolvedDestinationName"`
ResolvedSourceName string `json:"resolvedSourceName"`
}
func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessage {
return WebSocketStatusMessage{
WebSocketMessageMetadata: &WebSocketMessageMetadata{

View File

@@ -8,4 +8,5 @@ require (
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
)

View File

@@ -1,3 +1,5 @@
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=

View File

@@ -5,21 +5,25 @@ import (
"bytes"
"encoding/hex"
"fmt"
"github.com/bradleyfalzon/tlsx"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"
)
const checkTLSPacketAmount = 100
type httpReaderDataMsg struct {
bytes []byte
timestamp time.Time
}
type tcpID struct {
srcIP string
dstIP string
srcIP string
dstIP string
srcPort string
dstPort string
}
@@ -42,28 +46,44 @@ func (tid *tcpID) String() string {
* Implements io.Reader interface (Read)
*/
type httpReader struct {
ident string
tcpID tcpID
isClient bool
isHTTP2 bool
isOutgoing bool
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
hexdump bool
parent *tcpStream
grpcAssembler GrpcAssembler
messageCount uint
harWriter *HarWriter
ident string
tcpID tcpID
isClient bool
isHTTP2 bool
isOutgoing bool
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
hexdump bool
parent *tcpStream
grpcAssembler GrpcAssembler
messageCount uint
harWriter *HarWriter
packetsSeen uint
outboundLinkWriter *OutboundLinkWriter
}
func (h *httpReader) Read(p []byte) (int, error) {
var msg httpReaderDataMsg
ok := true
for ok && len(h.data) == 0 {
msg, ok = <-h.msgQueue
h.data = msg.bytes
h.captureTime = msg.timestamp
if len(h.data) > 0 {
h.packetsSeen += 1
}
if h.packetsSeen < checkTLSPacketAmount && len(msg.bytes) > 5 { // packets with less than 5 bytes cause tlsx to panic
clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
fmt.Printf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
numericPort, _ := strconv.Atoi(h.tcpID.dstPort)
h.outboundLinkWriter.WriteOutboundLink(h.tcpID.srcIP, h.tcpID.dstIP, numericPort, clientHello.SNI, TLSProtocol)
}
}
}
if !ok || len(h.data) == 0 {
return 0, io.EOF

View File

@@ -1,9 +1,17 @@
package tap
type OutboundLinkProtocol string
const (
TLSProtocol OutboundLinkProtocol = "tls"
)
type OutboundLink struct {
Src string
DstIP string
DstPort int
SuggestedResolvedName string
SuggestedProtocol OutboundLinkProtocol
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
@@ -16,11 +24,13 @@ type OutboundLinkWriter struct {
OutChan chan *OutboundLink
}
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int) {
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
SuggestedResolvedName: SuggestedResolvedName,
SuggestedProtocol: SuggestedProtocol,
}
}

View File

@@ -33,7 +33,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
dstPort := int(tcp.DstPort)
if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort)
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "")
}
props := factory.getStreamProps(srcIp, dstIp, dstPort)
isHTTP := props.isTapTarget
@@ -57,11 +57,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
srcPort: transport.Src().String(),
dstPort: transport.Dst().String(),
},
hexdump: *hexdump,
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
hexdump: *hexdump,
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
outboundLinkWriter: factory.outbountLinkWriter,
}
stream.server = httpReader{
msgQueue: make(chan httpReaderDataMsg),
@@ -72,10 +73,11 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
srcPort: transport.Dst().String(),
dstPort: transport.Src().String(),
},
hexdump: *hexdump,
parent: stream,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
hexdump: *hexdump,
parent: stream,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
outboundLinkWriter: factory.outbountLinkWriter,
}
factory.wg.Add(2)
// Start reading from channels stream.client.bytes and stream.server.bytes
@@ -131,6 +133,5 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
type streamProps struct {
isTapTarget bool
isOutgoing bool
isOutgoing bool
}

22900
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,7 @@
"private": true,
"dependencies": {
"@material-ui/core": "^4.11.3",
"@material-ui/lab": "^4.0.0-alpha.60",
"@testing-library/jest-dom": "^5.11.10",
"@testing-library/react": "^11.2.6",
"@testing-library/user-event": "^12.8.3",

View File

@@ -1,10 +1,12 @@
import React, {useState} from 'react';
import React, {useEffect, useState} from 'react';
import './App.sass';
import logo from './components/assets/Mizu-logo.svg';
import {Button} from "@material-ui/core";
import {Button, Snackbar} from "@material-ui/core";
import {HarPage} from "./components/HarPage";
import Tooltip from "./components/Tooltip";
import {makeStyles} from "@material-ui/core/styles";
import MuiAlert from '@material-ui/lab/Alert';
import Api from "./helpers/api";
const useStyles = makeStyles(() => ({
@@ -15,11 +17,37 @@ const useStyles = makeStyles(() => ({
},
}));
const api = new Api();
const App = () => {
const classes = useStyles();
const [analyzeStatus, setAnalyzeStatus] = useState(null);
const [showTLSWarning, setShowTLSWarning] = useState(false);
const [userDismissedTLSWarning, setUserDismissedTLSWarning] = useState(false);
const [addressesWithTLS, setAddressesWithTLS] = useState(new Set());
useEffect(() => {
(async () => {
const recentTLSLinks = await api.getRecentTLSLinks();
if (recentTLSLinks?.length > 0) {
setAddressesWithTLS(new Set([...addressesWithTLS, ...recentTLSLinks]));
setShowTLSWarning(true);
}
})();
}, []);
const onTLSDetected = (destAddress: string) => {
addressesWithTLS.add(destAddress);
setAddressesWithTLS(new Set(addressesWithTLS));
if (!userDismissedTLSWarning) {
setShowTLSWarning(true);
}
};
const analysisMessage = analyzeStatus?.isRemoteReady ?
<span>
@@ -88,7 +116,12 @@ const App = () => {
</Tooltip>
}
</div>
<HarPage setAnalyzeStatus={setAnalyzeStatus}/>
<HarPage setAnalyzeStatus={setAnalyzeStatus} onTLSDetected={onTLSDetected}/>
<Snackbar open={showTLSWarning && !userDismissedTLSWarning}>
<MuiAlert elevation={6} variant="filled" onClose={() => setUserDismissedTLSWarning(true)} severity="warning">
Mizu is detecting TLS traffic{addressesWithTLS.size ? ` (directed to ${Array.from(addressesWithTLS).join(", ")})` : ''}, this type of traffic will not be displayed.
</MuiAlert>
</Snackbar>
</div>
);
}

View File

@@ -38,11 +38,12 @@ enum ConnectionStatus {
interface HarPageProps {
setAnalyzeStatus: (status: any) => void;
onTLSDetected: (destAddress: string) => void;
}
const api = new Api();
export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus, onTLSDetected}) => {
const classes = useLayoutStyles();
@@ -93,6 +94,9 @@ export const HarPage: React.FC<HarPageProps> = ({setAnalyzeStatus}) => {
case "analyzeStatus":
setAnalyzeStatus(message.analyzeStatus);
break
case "outboundLink":
onTLSDetected(message.Data.DstIP);
break;
default:
console.error(`unsupported websocket message type, Got: ${message.messageType}`)
}

View File

@@ -42,4 +42,9 @@ export default class Api {
const response = await this.client.get(`/entries?limit=50&operator=${operator}&timestamp=${timestamp}`);
return response.data;
}
}
getRecentTLSLinks = async () => {
const response = await this.client.get("/recentTLSLinks");
return response.data;
}
}