Compare commits

..

34 Commits

Author SHA1 Message Date
Alex Haiut
fd5b72a66e #minor 2021-06-20 14:49:54 +03:00
Alex Haiut
826f2cfb04 fixed release readme 2021-06-20 14:33:37 +03:00
Alex Haiut
e6f140adb2 use separate checksum files 2021-06-20 14:24:03 +03:00
Alex Haiut
1af2cd728d using markdown for release text 2021-06-20 14:08:03 +03:00
Alex Haiut
d5f6093084 fixed build error - created bin directory upfront 2021-06-20 13:56:07 +03:00
Alex Haiut
32ad2f17c3 added checksum calc to CLI makefile 2021-06-20 13:52:10 +03:00
Roee Gadot
553433622e no message 2021-06-20 10:42:27 +03:00
Roee Gadot
b6917ed0b7 no message 2021-06-20 08:53:12 +03:00
Roee Gadot
d22e5a9620 #minor 2021-06-20 08:24:04 +03:00
Roee Gadot
e880a9224f no message 2021-06-20 08:17:48 +03:00
Roee Gadot
67c3e5aab5 no message 2021-06-19 10:06:20 +03:00
Roee Gadot
ed791c988e no message 2021-06-19 10:00:22 +03:00
Roee Gadot
3567f5c00b no message 2021-06-19 09:40:19 +03:00
Roee Gadot
605c67c22f no message 2021-06-17 16:57:30 +03:00
Roee Gadot
9b44838fed no message 2021-06-17 15:59:43 +03:00
Roee Gadot
9f6abf7a32 no message 2021-06-17 11:23:05 +03:00
gadotroee
609f85e6ae Update tag-temp.yaml 2021-06-15 09:33:47 +03:00
gadotroee
d2188bfc22 Update tag-temp.yaml 2021-06-15 09:12:58 +03:00
gadotroee
d5942c3c49 Create tag-temp.yaml 2021-06-15 08:59:28 +03:00
Roee Gadot
f03c2adce4 remove main.yml and fix branches 2021-06-13 08:38:20 +03:00
Roee Gadot
93ebec09b9 no message 2021-06-10 21:41:20 +03:00
Roee Gadot
06c7b9c748 no message 2021-06-10 21:36:44 +03:00
Roee Gadot
db66119182 missing ) 2021-06-10 21:33:22 +03:00
Roee Gadot
2ae6153631 no message 2021-06-10 21:32:17 +03:00
Roee Gadot
2dfb0b2549 no message 2021-06-10 21:23:40 +03:00
Roee Gadot
a2b774ae1f no message 2021-06-10 21:21:11 +03:00
Roee Gadot
862f4252c3 yaml error 2021-06-10 21:17:07 +03:00
Roee Gadot
d5f057156e no message 2021-06-10 21:16:06 +03:00
Roee Gadot
96ce694945 trying new approach 2021-06-10 21:13:15 +03:00
gadotroee
2d7ff95426 Update main.yml 2021-06-08 15:17:15 +03:00
gadotroee
af9daf91cf Update main.yml 2021-06-08 15:08:55 +03:00
gadotroee
ea82d1779b Update main.yml 2021-06-08 15:05:55 +03:00
gadotroee
51ddbf3815 Update main.yml 2021-06-08 15:01:23 +03:00
gadotroee
8bbdcb6877 Create main.yml 2021-06-08 14:54:13 +03:00
48 changed files with 821 additions and 1064 deletions

View File

@@ -4,6 +4,7 @@ on:
branches:
- develop
- main
- my-temp-release-check
jobs:
docker:
runs-on: ubuntu-latest

View File

@@ -18,14 +18,12 @@ WORKDIR /app/api-build
COPY api/go.mod api/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
COPY tap/go.mod tap/go.mod ../tap/
RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
# Copy and build api code
COPY shared ../shared
COPY tap ../tap
COPY api .
RUN go build -ldflags="-s -w" -o mizuagent .

View File

@@ -7,21 +7,8 @@ Download `mizu` for your platform and operating system
### Latest stable release
* for MacOS - Intel
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \
&& chmod 755 mizu
```
* for Linux - Intel 64bit
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \
&& chmod 755 mizu
```
SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page.
* for MacOS - `curl -o mizu https://github.com/up9inc/mizu/releases/download/latest/mizu_darwin_amd64 && chmod 755 mizu`
* for Linux - `curl -o mizu https://github.com/up9inc/mizu/releases/download/latest/mizu_linux_amd64 && chmod 755 mizu`
### Development (unstable) build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page.
@@ -32,39 +19,6 @@ Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page.
2. Run `mizu PODNAME` or `mizu REGEX`
3. Open browser on `http://localhost:8899` as instructed ..
4. Watch the WebAPI traffic flowing ..
5. Type ^C to stop
## Examples
Run `mizu help` for usage options
To tap specific pod -
```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
front-end-649fc5fd6-kqbtn 2/2 Running 0 7m
..
$ mizu tap front-end-649fc5fd6-kqbtn
+front-end-649fc5fd6-kqbtn
Web interface is now available at http://localhost:8899
^C
```
To tap multiple pods using regex -
```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
carts-66c77f5fbb-fq65r 2/2 Running 0 20m
catalogue-5f4cb7cf5-7zrmn 2/2 Running 0 20m
front-end-649fc5fd6-kqbtn 2/2 Running 0 20m
..
$ mizu tap "^ca.*"
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
Web interface is now available at http://localhost:8899
^C
```
TBD

View File

@@ -11,12 +11,15 @@ require (
github.com/go-playground/universal-translator v0.17.0
github.com/go-playground/validator/v10 v10.5.0
github.com/gofiber/fiber/v2 v2.8.0
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/leodido/go-urn v1.2.1 // indirect
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
gorm.io/driver/sqlite v1.1.4
gorm.io/gorm v1.21.8
k8s.io/api v0.21.0
@@ -25,5 +28,3 @@ require (
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@@ -251,6 +251,7 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=

View File

@@ -7,12 +7,12 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/api"
"mizuserver/pkg/middleware"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/sensitiveDataFiltering"
"mizuserver/pkg/tap"
"mizuserver/pkg/utils"
"os"
"os/signal"
@@ -26,21 +26,16 @@ var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu c
func main() {
flag.Parse()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
if !*shouldTap && !*aggregator && !*standalone{
panic("One of the flags --tap, --api or --standalone must be provided")
}
if *standalone {
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
harOutputChannel := tap.StartPassiveTapper()
filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarItems(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil)
go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
} else if *shouldTap {
if *aggregatorAddress == "" {
@@ -49,26 +44,21 @@ func main() {
tapTargets := getTapTargets()
if tapTargets != nil {
tap.SetFilterAuthorities(tapTargets)
fmt.Println("Filtering for the following authorities:", tap.GetFilterIPs())
tap.HostAppAddresses = tapTargets
fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses)
}
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
harOutputChannel := tap.StartPassiveTapper()
socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
}
go pipeChannelToSocket(socketConnection, harOutputChannel)
go api.StartReadingOutbound(outboundLinkOutputChannel)
} else if *aggregator {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil)
go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
hostApi(socketHarOutChannel)
}
@@ -125,14 +115,9 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
return &filteringOptions
}
func filterHarItems(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
outChannel <- message
}
}

View File

@@ -5,20 +5,18 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/resolver"
"mizuserver/pkg/tap"
"mizuserver/pkg/utils"
"net/url"
"os"
"path"
"sort"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var k8sResolver *resolver.Resolver
@@ -59,21 +57,14 @@ func startReadingFiles(workingDir string) {
for true {
dir, _ := os.Open(workingDir)
dirFiles, _ := dir.Readdir(-1)
sort.Sort(utils.ByModTime(dirFiles))
var harFiles []os.FileInfo
for _, fileInfo := range dirFiles {
if strings.HasSuffix(fileInfo.Name(), ".har") {
harFiles = append(harFiles, fileInfo)
}
}
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
if len(dirFiles) == 0 {
fmt.Printf("Waiting for new files\n")
time.Sleep(3 * time.Second)
continue
}
fileInfo := harFiles[0]
fileInfo := dirFiles[0]
inputFilePath := path.Join(workingDir, fileInfo.Name())
file, err := os.Open(inputFilePath)
utils.CheckErr(err)
@@ -84,7 +75,7 @@ func startReadingFiles(workingDir string) {
for _, entry := range inputHar.Log.Entries {
time.Sleep(time.Millisecond * 250)
saveHarToDb(entry, fileInfo.Name(), false)
saveHarToDb(entry, fileInfo.Name())
}
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
@@ -97,19 +88,11 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
}
for item := range outputItems {
saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP, item.ConnectionInfo.IsOutgoing)
saveHarToDb(item.HarEntry, item.RequestSenderIp)
}
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
// tcpStreamFactory will block on write to channel. Empty channel to unblock.
// TODO: Make write to channel optional.
for range outboundLinkChannel {
}
}
func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) {
func saveHarToDb(entry *har.Entry, sender string) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
entryId := primitive.NewObjectID().Hex()
@@ -133,7 +116,6 @@ func saveHarToDb(entry *har.Entry, sender string, isOutgoing bool) {
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
IsOutgoing: isOutgoing,
}
database.GetEntriesTable().Create(&mizuEntry)
@@ -147,7 +129,3 @@ func getServiceNameFromUrl(inputUrl string) (string, string, string) {
utils.CheckErr(err)
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path, parsed.Host
}
func CheckIsServiceIP(address string) bool {
return k8sResolver.CheckIsServiceIP(address)
}

View File

@@ -5,10 +5,10 @@ import (
"fmt"
"github.com/antoniodipinto/ikisocket"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/controllers"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/tap"
)
var browserClientSocketUUIDs = make([]string, 0)

View File

@@ -1,21 +1,14 @@
package controllers
import (
"bytes"
"compress/zlib"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/google/martian/har"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"net/url"
"time"
)
const (
@@ -82,23 +75,11 @@ func GetHARs(c *fiber.Ctx) error {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []models.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
// Where(fmt.Sprintf("timestamp %s %v", operatorSymbol, entriesFilter.Timestamp)).
Limit(1000).
Find(&entries)
if len(entries) > 0 {
@@ -144,119 +125,6 @@ func GetHARs(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).SendStream(buffer)
}
func uploadEntriesImpl(token string, model string, envPrefix string) {
sleepTime := time.Second * 10
var timestampFrom int64 = 0
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Printf("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
if len(entriesArray) > 0 {
fmt.Printf("About to upload %v entries\n", len(entriesArray))
body, jMarshalErr := json.Marshal(entriesArray)
if jMarshalErr != nil {
log.Fatal(jMarshalErr)
}
var in bytes.Buffer
w := zlib.NewWriter(&in)
_, _ = w.Write(body)
_ = w.Close()
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
postUrl, _ := url.Parse(fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", envPrefix, model))
fmt.Println(postUrl)
req := &http.Request{
Method: http.MethodPost,
URL: postUrl,
Header: map[string][]string{
"Content-Encoding": {"deflate"},
"Content-Type": {"application/octet-stream"},
"Guest-Auth": {token},
},
Body: reqBody,
}
_, postErr := http.DefaultClient.Do(req)
if postErr != nil {
log.Fatal(postErr)
}
fmt.Printf("Finish uploading %v entries to %s\n", len(entriesArray), postUrl)
} else {
fmt.Println("Nothing to upload")
}
fmt.Printf("Sleeping for %v...\n", sleepTime)
time.Sleep(sleepTime)
timestampFrom = timestampTo
}
}
func UploadEntries(c *fiber.Ctx) error {
entriesFilter := &models.UploadEntriesRequestBody{}
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
if err := validation.Validate(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
go uploadEntriesImpl(entriesFilter.Token, entriesFilter.Model, entriesFilter.Dest)
return c.Status(fiber.StatusOK).SendString("OK")
}
func GetFullEntries(c *fiber.Ctx) error {
entriesFilter := &models.HarFetchRequestBody{}
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
err := validation.Validate(entriesFilter)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
entriesArray := getEntriesFromDb(timestampFrom, timestampTo)
return c.Status(fiber.StatusOK).JSON(entriesArray)
}
func getEntriesFromDb(timestampFrom int64, timestampTo int64) []har.Entry {
order := OrderDesc
var entries []models.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest so we should revers
utils.ReverseSlice(entries)
}
entriesArray := make([]har.Entry, 0)
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
entriesArray = append(entriesArray, harEntry)
}
return entriesArray
}
func GetEntry(c *fiber.Ctx) error {
var entryData models.EntryData
database.GetEntriesTable().

View File

@@ -4,7 +4,7 @@ import (
"encoding/json"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/tap"
"time"
)
@@ -12,18 +12,17 @@ type MizuEntry struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
Path string `json:"path" gorm:"column:path"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
}
type BaseEntryDetails struct {
@@ -35,11 +34,10 @@ type BaseEntryDetails struct {
StatusCode int `json:"statusCode,omitempty"`
Method string `json:"method,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
}
type EntryData struct {
Entry string `json:"entry,omitempty"`
Entry string `json:"entry,omitempty"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
}
@@ -49,15 +47,8 @@ type EntriesFilter struct {
Timestamp int64 `query:"timestamp" validate:"required,min=1"`
}
type UploadEntriesRequestBody struct {
Token string `query:"token"`
Model string `query:"model"`
Dest string `query:"dest"`
}
type HarFetchRequestBody struct {
From int64 `query:"from"`
To int64 `query:"to"`
Limit int `query:"limit"`
}
type WebSocketEntryMessage struct {
@@ -65,6 +56,7 @@ type WebSocketEntryMessage struct {
Data *BaseEntryDetails `json:"data,omitempty"`
}
type WebSocketTappedEntryMessage struct {
*shared.WebSocketMessageMetadata
Data *tap.OutputChannelItem
@@ -90,6 +82,7 @@ func CreateWebsocketTappedEntryMessage(base *tap.OutputChannelItem) ([]byte, err
return json.Marshal(message)
}
// ExtendedHAR is the top level object of a HAR log.
type ExtendedHAR struct {
Log *ExtendedLog `json:"log"`
@@ -107,5 +100,5 @@ type ExtendedLog struct {
type ExtendedCreator struct {
*har.Creator
Source string `json:"_source"`
}
Source string `json:"_source"`
}

View File

@@ -21,7 +21,7 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) {
if err != nil {
return nil, err
}
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil
}
func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, error) {
@@ -53,9 +53,9 @@ func NewFromOutOfCluster(kubeConfigPath string, errOut chan error) (*Resolver, e
return nil, err
}
return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
return &Resolver{clientConfig: clientConfig, clientSet: clientset, nameMap: make(map[string]string), errOut: errOut}, nil
}
func NewFromExisting(clientConfig *restclient.Config, clientSet *kubernetes.Clientset, errOut chan error) *Resolver {
return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}
return &Resolver{clientConfig: clientConfig, clientSet: clientSet, nameMap: make(map[string]string), errOut: errOut}
}

View File

@@ -20,7 +20,6 @@ type Resolver struct {
clientConfig *restclient.Config
clientSet *kubernetes.Clientset
nameMap map[string]string
serviceMap map[string]string
isStarted bool
errOut chan error
}
@@ -42,11 +41,6 @@ func (resolver *Resolver) Resolve(name string) string {
return resolvedName
}
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
_, isFound := resolver.serviceMap[address]
return isFound
}
func (resolver *Resolver) watchPods(ctx context.Context) error {
// empty namespace makes the client watch all namespaces
watcher, err := resolver.clientSet.CoreV1().Pods("").Watch(ctx, metav1.ListOptions{Watch: true})
@@ -130,7 +124,6 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
@@ -154,14 +147,6 @@ func (resolver *Resolver) saveResolvedName(key string, resolved string, eventTyp
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
if eventType == watch.Deleted {
delete(resolver.serviceMap, key)
} else {
resolver.serviceMap[key] = resolved
}
}
func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun func(ctx context.Context) error) {
for {
err := fun(ctx)

View File

@@ -11,11 +11,8 @@ func EntriesRoutes(fiberApp *fiber.App) {
routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.Get("/exportEntries", controllers.GetFullEntries)
routeGroup.Get("/uploadEntries", controllers.UploadEntries)
routeGroup.Get("/har", controllers.GetHARs)
routeGroup.Get("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -5,7 +5,7 @@ import (
"encoding/xml"
"errors"
"fmt"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/tap"
"net/url"
"strings"
@@ -15,8 +15,8 @@ import (
)
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) {
harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0)
@@ -44,19 +44,12 @@ func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, opt
}
}
func filterHarHeaders(headers []har.Header) []har.Header {
newHeaders := make([]har.Header, 0)
func filterHarHeaders(headers []har.Header) {
for i, header := range headers {
if strings.ToLower(header.Name) == "cookie" {
continue
} else if isFieldNameSensitive(header.Name) {
newHeaders = append(newHeaders, har.Header{Name: header.Name, Value: maskedFieldPlaceholderValue})
if isFieldNameSensitive(header.Name) {
headers[i].Value = maskedFieldPlaceholderValue
} else {
newHeaders = append(newHeaders, header)
}
}
return newHeaders
}
func getContentTypeHeaderValue(headers []har.Header) string {

View File

@@ -84,14 +84,14 @@ type GrpcAssembler struct {
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return 0, nil, err
return 0, nil, "", err
}
streamID := frame.Header().StreamID
@@ -99,7 +99,7 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
return 0, nil, nil
return 0, nil, "", nil
}
headers, data := ga.fragmentsByStream.pop(streamID)
@@ -137,10 +137,10 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
ContentLength: int64(len(dataString)),
}
} else {
return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message")
return 0, nil, "", errors.New("Failed to assemble stream: neither a request nor a message")
}
return streamID, messageHTTP1, nil
return streamID, messageHTTP1, dataString, nil
}
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
@@ -16,8 +15,7 @@ import (
)
const readPermission = 0644
const harFilenameSuffix = ".har"
const tempFilenameSuffix = ".har.tmp"
const tempFilenamePrefix = "har_writer"
type PairChanItem struct {
Request *http.Request
@@ -25,13 +23,12 @@ type PairChanItem struct {
Response *http.Response
ResponseTime time.Time
RequestSenderIp string
ConnectionInfo *ConnectionInfo
}
func openNewHarFile(filename string) *HarFile {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, readPermission)
if err != nil {
log.Panicf("Failed to open output file: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to open output file: %s (%v,%+v)", err, err, err))
}
harFile := HarFile{file: file, entryCount: 0}
@@ -48,13 +45,13 @@ type HarFile struct {
func NewEntry(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time) (*har.Entry, error) {
harRequest, err := har.NewRequest(request, true)
if err != nil {
SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)\n", err, err, err)
return nil, errors.New("Failed converting request to HAR")
}
harResponse, err := har.NewResponse(response, true)
if err != nil {
SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)\n", err, err, err)
return nil, errors.New("Failed converting response to HAR")
}
@@ -65,7 +62,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo
status, err := strconv.Atoi(response.Header.Get(":status"))
if err != nil {
SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)\n", err, err, err)
return nil, errors.New("Failed converting response status to int for HAR")
}
harResponse.Status = status
@@ -105,7 +102,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo
func (f *HarFile) WriteEntry(harEntry *har.Entry) {
harEntryJson, err := json.Marshal(harEntry)
if err != nil {
SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)", err, err, err)
SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)\n", err, err, err)
return
}
@@ -119,7 +116,7 @@ func (f *HarFile) WriteEntry(harEntry *har.Entry) {
harEntryString := append([]byte(separator), harEntryJson...)
if _, err := f.file.Write(harEntryString); err != nil {
log.Panicf("Failed to write to output file: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to write to output file: %s (%v,%+v)", err, err, err))
}
f.entryCount++
@@ -134,21 +131,21 @@ func (f *HarFile) Close() {
err := f.file.Close()
if err != nil {
log.Panicf("Failed to close output file: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to close output file: %s (%v,%+v)", err, err, err))
}
}
func (f*HarFile) writeHeader() {
header := []byte(`{"log": {"version": "1.2", "creator": {"name": "Mizu", "version": "0.0.1"}, "entries": [`)
if _, err := f.file.Write(header); err != nil {
log.Panicf("Failed to write header to output file: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to write header to output file: %s (%v,%+v)", err, err, err))
}
}
func (f*HarFile) writeTrailer() {
trailer := []byte("]}}")
if _, err := f.file.Write(trailer); err != nil {
log.Panicf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err))
}
}
@@ -164,8 +161,8 @@ func NewHarWriter(outputDir string, maxEntries int) *HarWriter {
}
type OutputChannelItem struct {
HarEntry *har.Entry
ConnectionInfo *ConnectionInfo
HarEntry *har.Entry
RequestSenderIp string
}
type HarWriter struct {
@@ -177,20 +174,20 @@ type HarWriter struct {
done chan bool
}
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, connectionInfo *ConnectionInfo) {
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, requestSenderIp string) {
hw.PairChan <- &PairChanItem{
Request: request,
RequestTime: requestTime,
Response: response,
ResponseTime: responseTime,
ConnectionInfo: connectionInfo,
Request: request,
RequestTime: requestTime,
Response: response,
ResponseTime: responseTime,
RequestSenderIp: requestSenderIp,
}
}
func (hw *HarWriter) Start() {
if hw.OutputDirPath != "" {
if err := os.MkdirAll(hw.OutputDirPath, os.ModePerm); err != nil {
log.Panicf("Failed to create output directory: %s (%v,%+v)", err, err, err)
panic(fmt.Sprintf("Failed to create output directory: %s (%v,%+v)", err, err, err))
}
}
@@ -213,8 +210,8 @@ func (hw *HarWriter) Start() {
}
} else {
hw.OutChan <- &OutputChannelItem{
HarEntry: harEntry,
ConnectionInfo: pair.ConnectionInfo,
HarEntry: harEntry,
RequestSenderIp: pair.RequestSenderIp,
}
}
}
@@ -229,11 +226,10 @@ func (hw *HarWriter) Start() {
func (hw *HarWriter) Stop() {
close(hw.PairChan)
<-hw.done
close(hw.OutChan)
}
func (hw *HarWriter) openNewFile() {
filename := buildFilename(hw.OutputDirPath, time.Now(), tempFilenameSuffix)
filename := filepath.Join(os.TempDir(), fmt.Sprintf("%s_%d", tempFilenamePrefix, time.Now().UnixNano()))
hw.currentFile = openNewHarFile(filename)
}
@@ -242,15 +238,15 @@ func (hw *HarWriter) closeFile() {
tmpFilename := hw.currentFile.file.Name()
hw.currentFile = nil
filename := buildFilename(hw.OutputDirPath, time.Now(), harFilenameSuffix)
filename := buildFilename(hw.OutputDirPath, time.Now())
err := os.Rename(tmpFilename, filename)
if err != nil {
SilentError("Rename-file", "cannot rename file: %s (%v,%+v)", err, err, err)
SilentError("Rename-file", "cannot rename file: %s (%v,%+v)\n", err, err, err)
}
}
func buildFilename(dir string, t time.Time, suffix string) string {
func buildFilename(dir string, t time.Time) string {
// (epoch time in nanoseconds)__(YYYY_Month_DD__hh-mm-ss).har
filename := fmt.Sprintf("%d__%s%s", t.UnixNano(), t.Format("2006_Jan_02__15-04-05"), suffix)
filename := fmt.Sprintf("%d__%s.har", t.UnixNano(), t.Format("2006_Jan_02__15-04-05"))
return filepath.Join(dir, filename)
}

209
api/pkg/tap/http_matcher.go Normal file
View File

@@ -0,0 +1,209 @@
package tap
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/orcaman/concurrent-map"
)
type requestResponsePair struct {
Request httpMessage `json:"request"`
Response httpMessage `json:"response"`
}
type envoyMessageWrapper struct {
HttpBufferedTrace requestResponsePair `json:"http_buffered_trace"`
}
type headerKeyVal struct {
Key string `json:"key"`
Value string `json:"value"`
}
type messageBody struct {
Truncated bool `json:"truncated"`
AsBytes string `json:"as_bytes"`
}
type httpMessage struct {
IsRequest bool
Headers []headerKeyVal `json:"headers"`
HTTPVersion string `json:"httpVersion"`
Body messageBody `json:"body"`
captureTime time.Time
orig interface {}
requestSenderIp string
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper {
split := splitIdent(ident)
key := genKey(split)
messageExtraHeaders := []headerKeyVal{
{Key: "x-up9-source", Value: split[0]},
{Key: "x-up9-destination", Value: split[1] + ":" + split[3]},
}
requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2, split[0])
if response, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
responseHTTPMessage := response.(*httpMessage)
if responseHTTPMessage.IsRequest {
SilentError("Request-Duplicate", "Got duplicate request with same identifier\n")
return nil
}
Debug("Matched open Response for %s\n", key)
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &requestHTTPMessage)
Debug("Registered open Request for %s\n", key)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := responseToMessage(response, captureTime, body, isHTTP2)
if request, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
requestHTTPMessage := request.(*httpMessage)
if !requestHTTPMessage.IsRequest {
SilentError("Response-Duplicate", "Got duplicate response with same identifier\n")
return nil
}
Debug("Matched open Request for %s\n", key)
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &responseHTTPMessage)
Debug("Registered open Response for %s\n", key)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *envoyMessageWrapper {
matcher.addDuration(requestHTTPMessage, responseHTTPMessage)
return &envoyMessageWrapper{
HttpBufferedTrace: requestResponsePair{
Request: *requestHTTPMessage,
Response: *responseHTTPMessage,
},
}
}
func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool, requestSenderIp string) httpMessage {
messageHeaders := make([]headerKeyVal, 0)
for key, value := range request.Header {
messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]})
}
if !isHTTP2 {
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":method", Value: request.Method})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":path", Value: request.RequestURI})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":authority", Value: request.Host})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":scheme", Value: "http"})
}
messageHeaders = append(messageHeaders, headerKeyVal{Key: "x-request-start", Value: fmt.Sprintf("%.3f", float64(captureTime.UnixNano()) / float64(1000000000))})
messageHeaders = append(messageHeaders, *messageExtraHeaders...)
httpVersion := request.Proto
requestBody := messageBody{Truncated: false, AsBytes: body}
return httpMessage{
IsRequest: true,
Headers: messageHeaders,
HTTPVersion: httpVersion,
Body: requestBody,
captureTime: captureTime,
orig: request,
requestSenderIp: requestSenderIp,
}
}
func responseToMessage(response *http.Response, captureTime time.Time, body string, isHTTP2 bool) httpMessage {
messageHeaders := make([]headerKeyVal, 0)
for key, value := range response.Header {
messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]})
}
if !isHTTP2 {
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":status", Value: strconv.Itoa(response.StatusCode)})
}
httpVersion := response.Proto
requestBody := messageBody{Truncated: false, AsBytes: body}
return httpMessage{
IsRequest: false,
Headers: messageHeaders,
HTTPVersion: httpVersion,
Body: requestBody,
captureTime: captureTime,
orig: response,
}
}
func (matcher *requestResponseMatcher) addDuration(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) {
durationMs := float64(responseHTTPMessage.captureTime.UnixNano() / 1000000) - float64(requestHTTPMessage.captureTime.UnixNano() / 1000000)
if durationMs < 1 {
durationMs = 1
}
responseHTTPMessage.Headers = append(responseHTTPMessage.Headers, headerKeyVal{Key: "x-up9-duration-ms", Value: fmt.Sprintf("%.0f", durationMs)})
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}
func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int {
keysToPop := make([]string, 0)
for item := range matcher.openMessagesMap.IterBuffered() {
// Map only contains values of type httpMessage
message, _ := item.Val.(*httpMessage)
if message.captureTime.Before(t) {
keysToPop = append(keysToPop, item.Key)
}
}
numDeleted := len(keysToPop)
for _, key := range keysToPop {
_, _ = matcher.openMessagesMap.Pop(key)
}
return numDeleted
}

View File

@@ -3,7 +3,10 @@ package tap
import (
"bufio"
"bytes"
"compress/gzip"
b64 "encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -24,14 +27,6 @@ type tcpID struct {
dstPort string
}
type ConnectionInfo struct {
ClientIP string
ClientPort string
ServerIP string
ServerPort string
IsOutgoing bool
}
func (tid *tcpID) String() string {
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
}
@@ -46,7 +41,6 @@ type httpReader struct {
tcpID tcpID
isClient bool
isHTTP2 bool
isOutgoing bool
msgQueue chan httpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
@@ -79,7 +73,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
b := bufio.NewReader(h)
if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil {
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err)
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)\n", h.ident, err, err, err)
// Do something?
} else {
h.isHTTP2 = isHTTP2
@@ -88,7 +82,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if h.isHTTP2 {
err := prepareHTTP2Connection(b, h.isClient)
if err != nil {
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err)
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err)
}
h.grpcAssembler = createGrpcAssembler(b)
}
@@ -99,7 +93,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", h.ident, err, err, err)
SilentError("HTTP/2", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}
} else if h.isClient {
@@ -107,7 +101,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", h.ident, err, err, err)
SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}
} else {
@@ -115,7 +109,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", h.ident, err, err, err)
SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}
}
@@ -123,49 +117,38 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
}
func (h *httpReader) handleHTTP2Stream() error {
streamID, messageHTTP1, err := h.grpcAssembler.readMessage()
streamID, messageHTTP1, body, err := h.grpcAssembler.readMessage()
h.messageCount++
if err != nil {
return err
}
var reqResPair *requestResponsePair
var connectionInfo *ConnectionInfo
var reqResPair *envoyMessageWrapper
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID)
connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.srcIP,
ClientPort: h.tcpID.srcPort,
ServerIP: h.tcpID.dstIP,
ServerPort: h.tcpID.dstPort,
IsOutgoing: h.isOutgoing,
}
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime)
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime, body, true)
case http.Response:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID)
connectionInfo = &ConnectionInfo{
ClientIP: h.tcpID.dstIP,
ClientPort: h.tcpID.dstPort,
ServerIP: h.tcpID.srcIP,
ServerPort: h.tcpID.srcPort,
IsOutgoing: h.isOutgoing,
}
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime)
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime, body, true)
}
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
connectionInfo,
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
return err
}
broadcastReqResPair(jsonStr)
}
}
@@ -182,35 +165,37 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
s := len(body)
if err != nil {
SilentError("HTTP-request-body", "stream %s Got body err: %s", h.ident, err)
SilentError("HTTP-request-body", "stream %s Got body err: %s\n", h.ident, err)
} else if h.hexdump {
Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body))
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
}
if err := req.Body.Close(); err != nil {
SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s", h.ident, err)
SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s\n", h.ident, err)
}
encoding := req.Header["Content-Encoding"]
Info("HTTP/1 Request: %s %s %s (Body:%d) -> %s", h.ident, req.Method, req.URL, s, encoding)
bodyStr, err := readBody(body, encoding)
if err != nil {
SilentError("HTTP-request-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err)
}
Info("HTTP/%s Request: %s %s (Body:%d)\n", h.ident, req.Method, req.URL, s)
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime, bodyStr, false)
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
&ConnectionInfo{
ClientIP: h.tcpID.srcIP,
ClientPort: h.tcpID.srcPort,
ServerIP: h.tcpID.dstIP,
ServerPort: h.tcpID.dstPort,
IsOutgoing: h.isOutgoing,
},
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
}
broadcastReqResPair(jsonStr)
}
}
@@ -239,13 +224,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
s := len(body)
if err != nil {
SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s", h.ident, s, err)
SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err)
}
if h.hexdump {
Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body))
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
}
if err := res.Body.Close(); err != nil {
SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s", h.ident, s, err)
SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s\n", h.ident, s, err)
}
sym := ","
if res.ContentLength > 0 && res.ContentLength != int64(s) {
@@ -256,29 +241,54 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
contentType = []string{http.DetectContentType(body)}
}
encoding := res.Header["Content-Encoding"]
Info("HTTP/1 Response: %s %s URL:%s (%d%s%d%s) -> %s", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
bodyStr, err := readBody(body, encoding)
if err != nil {
SilentError("HTTP-response-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err)
}
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime, bodyStr, false)
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
&ConnectionInfo{
ClientIP: h.tcpID.dstIP,
ClientPort: h.tcpID.dstPort,
ServerIP: h.tcpID.srcIP,
ServerPort: h.tcpID.srcPort,
IsOutgoing: h.isOutgoing,
},
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
}
broadcastReqResPair(jsonStr)
}
}
return nil
}
func readBody(bodyBytes []byte, encoding []string) (string, error) {
var bodyBuffer io.Reader
bodyBuffer = bytes.NewBuffer(bodyBytes)
var err error
if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") {
bodyBuffer, err = gzip.NewReader(bodyBuffer)
if err != nil {
SilentError("HTTP-gunzip", "Failed to gzip decode: %s\n", err)
return "", err
}
}
if _, ok := bodyBuffer.(*gzip.Reader); ok {
err = bodyBuffer.(*gzip.Reader).Close()
if err != nil {
return "", err
}
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(bodyBuffer)
return b64.StdEncoding.EncodeToString(buf.Bytes()), err
}

View File

@@ -10,8 +10,10 @@ package tap
import (
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"github.com/up9inc/mizu/shared"
"log"
"os"
"os/signal"
@@ -31,10 +33,12 @@ import (
)
const AppPortsEnvVar = "APP_PORTS"
const OutPortEnvVar = "WEB_SOCKET_PORT"
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
// default is 1MB, more than the max size accepted by collector and traffic-dumper
const maxHTTP2DataLenDefault = 1 * 1024 * 1024
const cleanPeriod = time.Second * 10
const outboundThrottleCacheExpiryPeriod = time.Minute * 15
var remoteOnlyOutboundPorts = []int { 80, 443 }
func parseAppPorts(appPortsList string) []int {
@@ -42,7 +46,7 @@ func parseAppPorts(appPortsList string) []int {
for _, portStr := range strings.Split(appPortsList, ",") {
parsedInt, parseError := strconv.Atoi(portStr)
if parseError != nil {
log.Printf("Provided app port %v is not a valid number!", portStr)
fmt.Println("Provided app port ", portStr, " is not a valid number!")
} else {
ports = append(ports, parsedInt)
}
@@ -50,6 +54,13 @@ func parseAppPorts(appPortsList string) []int {
return ports
}
func parseHostAppAddresses(hostAppAddressesString string) []string {
if len(hostAppAddressesString) == 0 {
return []string{}
}
return strings.Split(hostAppAddressesString, ",")
}
var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
@@ -79,6 +90,7 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var hostAppAddressesString = flag.String("targets", "", "Comma separated list of ip:ports to tap")
var memprofile = flag.String("memprofile", "", "Write memory profile")
@@ -109,20 +121,24 @@ var stats struct {
overlapPackets int
}
type TapOpts struct {
HostMode bool
type CollectorMessage struct {
MessageType string
Ports *[]int `json:"ports,omitempty"`
Addresses *[]string `json:"addresses,omitempty"`
}
var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var ownIps []string // global
var hostMode bool // global
var appPorts []int // global
var ownIps []string //global
var hostMode bool //global
var HostAppAddresses []string //global
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
* s, a: arguments fmt.Printf
* Note: Too bad for perf that a... is evaluated
*/
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
@@ -133,7 +149,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) {
errorsMapMutex.Unlock()
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
log.Printf(formatStr, a...)
fmt.Printf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
@@ -144,12 +160,12 @@ func SilentError(t string, s string, a ...interface{}) {
}
func Info(s string, a ...interface{}) {
if outputLevel >= 1 {
log.Printf(s, a...)
fmt.Printf(s, a...)
}
}
func Debug(s string, a ...interface{}) {
if outputLevel >= 2 {
log.Printf(s, a...)
fmt.Printf(s, a...)
}
}
@@ -171,8 +187,9 @@ func inArrayString(arr []string, valueToCheck string) bool {
return false
}
// Context
// The assembler context
/*
* The assembler context
*/
type Context struct {
CaptureInfo gopacket.CaptureInfo
}
@@ -181,27 +198,22 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) {
hostMode = opts.HostMode
func StartPassiveTapper() <-chan *OutputChannelItem {
var harWriter *HarWriter
if *dumpToHar {
harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
}
outboundLinkWriter := NewOutboundLinkWriter()
go startPassiveTapper(harWriter, outboundLinkWriter)
go startPassiveTapper(harWriter)
if harWriter != nil {
return harWriter.OutChan, outboundLinkWriter.OutChan
return harWriter.OutChan
}
return nil, outboundLinkWriter.OutChan
return nil
}
func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
func startPassiveTapper(harWriter *HarWriter) {
defer util.Run()()
if *debug {
outputLevel = 2
@@ -214,43 +226,68 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
log.Println("Failed to get self IP addresses")
Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
fmt.Println("Failed to get self IP addresses")
Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)\n", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
}
appPortsStr := os.Getenv(AppPortsEnvVar)
var appPorts []int
if appPortsStr == "" {
log.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!")
fmt.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!")
appPorts = make([]int, 0)
} else {
appPorts = parseAppPorts(appPortsStr)
}
SetFilterPorts(appPorts)
tapOutputPort := os.Getenv(OutPortEnvVar)
if tapOutputPort == "" {
fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080")
tapOutputPort = "8080"
}
envVal := os.Getenv(maxHTTP2DataLenEnvVar)
if envVal == "" {
log.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
fmt.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
if convertedInt, err := strconv.Atoi(envVal); err != nil {
log.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
fmt.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
log.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault)
fmt.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault)
maxHTTP2DataLen = convertedInt
}
}
hostMode = os.Getenv(shared.HostModeEnvVar) == "1"
log.Printf("App Ports: %v", gSettings.filterPorts)
fmt.Printf("App Ports: %v\n", appPorts)
fmt.Printf("Tap output websocket port: %s\n", tapOutputPort)
var onCollectorMessage = func(message []byte) {
var parsedMessage CollectorMessage
err := json.Unmarshal(message, &parsedMessage)
if err == nil {
if parsedMessage.MessageType == "setPorts" {
Debug("Got message from collector. Type: %s, Ports: %v\n", parsedMessage.MessageType, parsedMessage.Ports)
appPorts = *parsedMessage.Ports
} else if parsedMessage.MessageType == "setAddresses" {
Debug("Got message from collector. Type: %s, IPs: %v\n", parsedMessage.MessageType, parsedMessage.Addresses)
HostAppAddresses = *parsedMessage.Addresses
Info("Filtering for the following addresses: %s\n", HostAppAddresses)
}
} else {
Error("Collector-Message-Parsing", "Error parsing message from collector: %s (%v,%+v)\n", err, err, err)
}
}
go startOutputServer(tapOutputPort, onCollectorMessage)
var handle *pcap.Handle
var err error
if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil {
log.Fatalf("PCAP OpenOffline error: %v", err)
log.Fatal("PCAP OpenOffline error:", err)
}
} else {
// This is a little complicated because we want to allow all possible options
@@ -276,15 +313,15 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
}
}
if handle, err = inactive.Activate(); err != nil {
log.Fatalf("PCAP Activate error: %v", err)
log.Fatal("PCAP Activate error:", err)
}
defer handle.Close()
}
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
Info("Using BPF filter %q", bpffilter)
Info("Using BPF filter %q\n", bpffilter)
if err = handle.SetBPFFilter(bpffilter); err != nil {
log.Fatalf("BPF filter error: %v", err)
log.Fatal("BPF filter error:", err)
}
}
@@ -292,7 +329,6 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
harWriter.Start()
defer harWriter.Stop()
}
defer outboundLinkWriter.Stop()
var dec gopacket.Decoder
var ok bool
@@ -306,18 +342,13 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
source := gopacket.NewPacketSource(handle, dec)
source.Lazy = *lazy
source.NoCopy = true
Info("Starting to read packets")
Info("Starting to read packets\n")
count := 0
bytes := int64(0)
start := time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()
streamFactory := &tcpStreamFactory{
doHTTP: !*nohttp,
harWriter: harWriter,
outbountLinkWriter: outboundLinkWriter,
}
streamFactory := &tcpStreamFactory{doHTTP: !*nohttp, harWriter: harWriter}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
var assemblerMutex sync.Mutex
@@ -347,7 +378,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s",
fmt.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\nErrors Summary: %s\n",
count,
bytes,
time.Since(start),
@@ -359,8 +390,8 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
log.Printf(
"mem: %d, goroutines: %d, unmatched messages: %d",
fmt.Printf(
"mem: %d, goroutines: %d, unmatched messages: %d\n",
memStats.HeapAlloc,
runtime.NumGoroutine(),
reqResMatcher.openMessagesMap.Count(),
@@ -369,8 +400,8 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
// Since the last print
cleanStats := cleaner.dumpStats()
appStats := statsTracker.dumpStats()
log.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d",
fmt.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d\n",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
@@ -381,11 +412,11 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
for packet := range source.Packets() {
count++
Debug("PACKET #%d", count)
Debug("PACKET #%d\n", count)
data := packet.Data()
bytes += int64(len(data))
if *hexdumppkt {
Debug("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
Debug("Packet content (%d/0x%x)\n%s\n", len(data), len(data), hex.Dump(data))
}
// defrag the IPv4 packet if required
@@ -400,18 +431,18 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
if err != nil {
log.Fatalln("Error while de-fragmenting", err)
} else if newip4 == nil {
Debug("Fragment...")
Debug("Fragment...\n")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
Debug("Decoding re-assembled packet: %s", newip4.NextLayerType())
Debug("Decoding re-assembled packet: %s\n", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
log.Panic("Not a PacketBuilder")
panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
nextDecoder.Decode(newip4.Payload, pb)
}
}
@@ -428,7 +459,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
CaptureInfo: packet.Metadata().CaptureInfo,
}
stats.totalsz += len(tcp.Payload)
// log.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort)
//fmt.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort)
assemblerMutex.Lock()
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
assemblerMutex.Unlock()
@@ -439,11 +470,11 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen)
fmt.Fprintf(os.Stderr, "Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\n", count, bytes, time.Since(start), nErrors, errorMapLen)
}
select {
case <-signalChan:
log.Printf("Caught SIGINT: aborting")
fmt.Fprintf(os.Stderr, "\nCaught SIGINT: aborting\n")
done = true
default:
// NOP: continue
@@ -466,34 +497,34 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
if err != nil {
log.Fatal(err)
}
_ = pprof.WriteHeapProfile(f)
_ = f.Close()
pprof.WriteHeapProfile(f)
f.Close()
}
streamFactory.WaitGoRoutines()
assemblerMutex.Lock()
Debug("%s", assembler.Dump())
Debug("%s\n", assembler.Dump())
assemblerMutex.Unlock()
if !*nodefrag {
log.Printf("IPdefrag:\t\t%d", stats.ipdefrag)
fmt.Printf("IPdefrag:\t\t%d\n", stats.ipdefrag)
}
log.Printf("TCP stats:")
log.Printf(" missed bytes:\t\t%d", stats.missedBytes)
log.Printf(" total packets:\t\t%d", stats.pkt)
log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm)
log.Printf(" rejected Options:\t%d", stats.rejectOpt)
log.Printf(" reassembled bytes:\t%d", stats.sz)
log.Printf(" total TCP bytes:\t%d", stats.totalsz)
log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm)
log.Printf(" reassembled chunks:\t%d", stats.reassembled)
log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets)
log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
log.Printf(" overlap packets:\t%d", stats.overlapPackets)
log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes)
log.Printf("Errors: %d", nErrors)
fmt.Printf("TCP stats:\n")
fmt.Printf(" missed bytes:\t\t%d\n", stats.missedBytes)
fmt.Printf(" total packets:\t\t%d\n", stats.pkt)
fmt.Printf(" rejected FSM:\t\t%d\n", stats.rejectFsm)
fmt.Printf(" rejected Options:\t%d\n", stats.rejectOpt)
fmt.Printf(" reassembled bytes:\t%d\n", stats.sz)
fmt.Printf(" total TCP bytes:\t%d\n", stats.totalsz)
fmt.Printf(" conn rejected FSM:\t%d\n", stats.rejectConnFsm)
fmt.Printf(" reassembled chunks:\t%d\n", stats.reassembled)
fmt.Printf(" out-of-order packets:\t%d\n", stats.outOfOrderPackets)
fmt.Printf(" out-of-order bytes:\t%d\n", stats.outOfOrderBytes)
fmt.Printf(" biggest-chunk packets:\t%d\n", stats.biggestChunkPackets)
fmt.Printf(" biggest-chunk bytes:\t%d\n", stats.biggestChunkBytes)
fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets)
fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes)
fmt.Printf("Errors: %d\n", nErrors)
for e := range errorsMap {
log.Printf(" %s:\t\t%d", e, errorsMap[e])
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
}
}

239
api/pkg/tap/tap_output.go Normal file
View File

@@ -0,0 +1,239 @@
package tap
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/patrickmn/go-cache"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
hub *Hub
outboundSocketNotifyExpiringCache = cache.New(outboundThrottleCacheExpiryPeriod, outboundThrottleCacheExpiryPeriod)
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func (_ *http.Request) bool { return true },
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
type OutBoundLinkMessage struct {
SourceIP string `json:"sourceIP"`
IP string `json:"ip"`
Port int `json:"port"`
Type string `json:"type"`
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.onMessageCallback(message)
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
type Hub struct {
// Registered clients.
clients map[*Client]bool
// Inbound messages from the clients.
broadcast chan []byte
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
// Handle messages from client
onMessageCallback func([]byte)
}
func newHub(onMessageCallback func([]byte)) *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
onMessageCallback: onMessageCallback,
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
// matched messages counter is incremented in this thread instead of in multiple http reader
// threads in order to reduce contention.
statsTracker.incMatchedMessages()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}
func startOutputServer(port string, messageCallback func([]byte)) {
hub = newHub(messageCallback)
go hub.run()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
err := http.ListenAndServe("0.0.0.0:" + port, nil)
if err != nil {
log.Fatal("Output server error: ", err)
}
}
func broadcastReqResPair(reqResJson []byte) {
hub.broadcast <- reqResJson
}
func broadcastOutboundLink(srcIP string, dstIP string, dstPort int) {
cacheKey := fmt.Sprintf("%s -> %s:%d", srcIP, dstIP, dstPort)
_, isInCache := outboundSocketNotifyExpiringCache.Get(cacheKey)
if isInCache {
return
} else {
outboundSocketNotifyExpiringCache.SetDefault(cacheKey, true)
}
socketMessage := OutBoundLinkMessage{
SourceIP: srcIP,
IP: dstIP,
Port: dstPort,
Type: "outboundSocketDetected",
}
jsonStr, err := json.Marshal(socketMessage)
if err != nil {
log.Printf("error marshalling outbound socket detection object: %v", err)
} else {
hub.broadcast <- jsonStr
}
}

View File

@@ -34,7 +34,7 @@ type tcpStream struct {
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
//SilentError("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String())
stats.rejectFsm++
if !t.fsmerr {
t.fsmerr = true
@@ -47,7 +47,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
//SilentError("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err)
stats.rejectOpt++
if !*nooptcheck {
return false
@@ -58,10 +58,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s\n", t.ident, err)
accept = false
} else if c != 0x0 {
SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
SilentError("Checksum", "%s: Invalid checksum: 0x%x\n", t.ident, c)
accept = false
}
}
@@ -95,7 +95,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d\n", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets
@@ -106,7 +106,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)\n", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
@@ -125,18 +125,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
Debug("dnsSize: %d, missing: %d", dnsSize, missing)
Debug("dnsSize: %d, missing: %d\n", dnsSize, missing)
if missing > 0 {
Info("Missing some bytes: %d", missing)
Info("Missing some bytes: %d\n", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
SilentError("DNS-parser", "Failed to decode DNS: %v", err)
SilentError("DNS-parser", "Failed to decode DNS: %v\n", err)
} else {
Debug("DNS: %s", gopacket.LayerDump(dns))
Debug("DNS: %s\n", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
@@ -144,7 +144,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else if t.isHTTP {
if length > 0 {
if *hexdump {
Debug("Feeding http with:%s", hex.Dump(data))
Debug("Feeding http with:\n%s", hex.Dump(data))
}
// This is where we pass the reassembled information onwards
// This channel is read by an httpReader object
@@ -158,7 +158,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Debug("%s: Connection closed", t.ident)
Debug("%s: Connection closed\n", t.ident)
if t.isHTTP {
close(t.client.msgQueue)
close(t.server.msgQueue)

View File

@@ -15,27 +15,24 @@ import (
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
*/
type tcpStreamFactory struct {
wg sync.WaitGroup
doHTTP bool
harWriter *HarWriter
outbountLinkWriter *OutboundLinkWriter
wg sync.WaitGroup
doHTTP bool
harWriter *HarWriter
}
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
Debug("* NEW: %s %s", net, transport)
Debug("* NEW: %s %s\n", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: *allowmissinginit,
}
Debug("Current App Ports: %v", gSettings.filterPorts)
srcIp := net.Src().String()
Debug("Current App Ports: %v\n", appPorts)
dstIp := net.Dst().String()
dstPort := int(tcp.DstPort)
if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort)
broadcastOutboundLink(net.Src().String(), dstIp, dstPort)
}
props := factory.getStreamProps(srcIp, dstIp, dstPort)
isHTTP := props.isTapTarget
isHTTP := factory.shouldTap(dstIp, dstPort)
stream := &tcpStream{
net: net,
transport: transport,
@@ -59,7 +56,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
hexdump: *hexdump,
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
}
stream.server = httpReader{
@@ -73,7 +69,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
},
hexdump: *hexdump,
parent: stream,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
}
factory.wg.Add(2)
@@ -88,29 +83,28 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait()
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, dstIP string, dstPort int) *streamProps {
func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool {
if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if *anydirection && inArrayString(gSettings.filterAuthorities, srcIP) == true {
return &streamProps{isTapTarget: true, isOutgoing: true}
if inArrayString(HostAppAddresses, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
return true
} else if inArrayString(HostAppAddresses, dstIP) == true {
return true
}
return &streamProps{isTapTarget: false}
return false
} else {
isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
isTappedPort := dstPort == 80 || (appPorts != nil && (inArrayInt(appPorts, dstPort)))
if !isTappedPort {
return &streamProps{isTapTarget: false, isOutgoing: false}
return false
}
isOutgoing := !inArrayString(ownIps, dstIP)
if !*anydirection && isOutgoing {
return &streamProps{isTapTarget: false, isOutgoing: isOutgoing}
if !*anydirection {
isDirectedHere := inArrayString(ownIps, dstIP)
if !isDirectedHere {
return false
}
}
return &streamProps{isTapTarget: true}
return true
}
}
@@ -121,9 +115,3 @@ func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPor
}
return true
}
type streamProps struct {
isTapTarget bool
isOutgoing bool
}

View File

@@ -70,15 +70,14 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
service = SetHostname(service, entry.ResolvedDestination)
}
return models.BaseEntryDetails{
Id: entry.EntryId,
Url: entryUrl,
Service: service,
Path: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
Id: entry.EntryId,
Url: entryUrl,
Service: service,
Path: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
RequestSenderIp: entry.RequestSenderIp,
IsOutgoing: entry.IsOutgoing,
}
}

View File

@@ -26,10 +26,10 @@ build-all: ## build for all supported platforms
@mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md
@$(MAKE) build GOOS=darwin GOARCH=amd64
@$(MAKE) build GOOS=linux GOARCH=amd64
@# $(MAKE) build GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=amd64
@# $(MAKE) GOOS=linux GOARCH=386
@# $(MAKE) GOOS=windows GOARCH=386
@# $(MAKE) GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=linux GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=arm64
@echo "---------"

View File

@@ -5,10 +5,8 @@ import (
)
type MizuFetchOptions struct {
FromTimestamp int64
ToTimestamp int64
Directory string
MizuPort uint
Limit uint16
Directory string
}
var mizuFetchOptions = MizuFetchOptions{}
@@ -25,8 +23,6 @@ var fetchCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(fetchCmd)
fetchCmd.Flags().Uint16VarP(&mizuFetchOptions.Limit, "limit", "l", 1000, "Provide a custom limit for entries to fetch")
fetchCmd.Flags().StringVarP(&mizuFetchOptions.Directory, "directory", "d", ".", "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.FromTimestamp, "from", 0, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.ToTimestamp, "to", 0, "Custom end timestamp fetched entries")
fetchCmd.Flags().UintVarP(&mizuFetchOptions.MizuPort, "port", "p", 8899, "Custom port for mizu")
}

View File

@@ -14,7 +14,7 @@ import (
)
func RunMizuFetch(fetch *MizuFetchOptions) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%v/api/har?from=%v&to=%v", fetch.MizuPort, fetch.FromTimestamp, fetch.ToTimestamp))
resp, err := http.Get(fmt.Sprintf("http://localhost:8899/api/har?limit=%v", fetch.Limit))
if err != nil {
log.Fatal(err)
}
@@ -53,7 +53,7 @@ func Unzip(reader *zip.Reader, dest string) error {
path := filepath.Join(dest, f.Name)
// Check for ZipSlip (Directory traversal)
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
if !strings.HasPrefix(path, filepath.Clean(dest) + string(os.PathSeparator)) {
return fmt.Errorf("illegal file path: %s", path)
}
@@ -61,7 +61,7 @@ func Unzip(reader *zip.Reader, dest string) error {
_ = os.MkdirAll(path, f.Mode())
} else {
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
fmt.Print("writing HAR file [ ", path, " ] .. ")
fmt.Print("writing HAR file [ ", path, " ] .. ")
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
@@ -70,7 +70,7 @@ func Unzip(reader *zip.Reader, dest string) error {
if err := f.Close(); err != nil {
panic(err)
}
fmt.Println(" done")
fmt.Println(" done")
}()
_, err = io.Copy(f, rc)
@@ -90,3 +90,5 @@ func Unzip(reader *zip.Reader, dest string) error {
return nil
}

View File

@@ -3,10 +3,8 @@ package cmd
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/up9inc/mizu/cli/mizu"
"regexp"
"github.com/spf13/cobra"
)
@@ -15,23 +13,20 @@ type MizuTapOptions struct {
GuiPort uint16
Namespace string
AllNamespaces bool
Analyze bool
AnalyzeDestination string
KubeConfigPath string
MizuImage string
MizuPodPort uint16
PlainTextFilterRegexes []string
TapOutgoing bool
}
var mizuTapOptions = &MizuTapOptions{}
var direction string
var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]",
Short: "Record ingoing traffic of a kubernetes pod",
Long: `Record the ingoing traffic of a kubernetes pod.
Supported protocols are HTTP and gRPC.`,
Supported protocols are HTTP and gRPC.`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return errors.New("POD REGEX argument is required")
@@ -44,15 +39,6 @@ Supported protocols are HTTP and gRPC.`,
return errors.New(fmt.Sprintf("%s is not a valid regex %s", args[0], err))
}
directionLowerCase := strings.ToLower(direction)
if directionLowerCase == "any" {
mizuTapOptions.TapOutgoing = true
} else if directionLowerCase == "in" {
mizuTapOptions.TapOutgoing = false
} else {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --direction. Acceptable values are in/any.", direction))
}
RunMizuTap(regex, mizuTapOptions)
return nil
},
@@ -63,12 +49,9 @@ func init() {
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
tapCmd.Flags().BoolVar(&mizuTapOptions.Analyze, "analyze", false, "Uploads traffic to UP9 cloud for further analysis")
tapCmd.Flags().StringVar(&mizuTapOptions.AnalyzeDestination, "dest", "up9.app", "Destination environment")
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
}

View File

@@ -2,17 +2,14 @@ package cmd
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/up9inc/mizu/shared"
"os"
"os/signal"
"regexp"
"syscall"
"time"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/debounce"
@@ -41,29 +38,12 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery); err != nil {
return
} else {
currentlyTappedPods = matchingPods
}
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
} else {
namespacesStr = "all namespaces"
}
fmt.Printf("Tapping pods in %s\n", namespacesStr)
if len(currentlyTappedPods) == 0 {
var suggestionStr string
if targetNamespace != mizu.K8sAllNamespaces {
suggestionStr = "\nSelect a different namespace with -n or tap all namespaces with -A"
}
fmt.Printf("Did not find any pods matching the regex argument%s\n", suggestionStr)
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
return
@@ -79,31 +59,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
//block until exit signal or error
waitForFinish(ctx, cancel)
}
type GuestToken struct {
Token string `json:"token"`
Model string `json:"model"`
}
func getGuestToken(url string, target *GuestToken) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(target)
}
func CreateAnonymousToken(envPrefix string) (*GuestToken, error) {
tokenUrl := fmt.Sprintf("https://trcc.%v/anonymous/token", envPrefix)
token := &GuestToken{}
if err := getGuestToken(tokenUrl, token); err != nil {
fmt.Println(err)
return nil, err
}
return token, nil
// TODO handle incoming traffic from tapper using a channel
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
@@ -111,7 +68,7 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
return err
}
@@ -155,27 +112,19 @@ func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.Traffic
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
}
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if len(nodeToTappedPodIPMap) > 0 {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
tappingOptions.TapOutgoing,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
} else {
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
fmt.Printf("Error deleting mizu tapper daemonset: %v\n", err)
return err
}
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
return nil
@@ -197,12 +146,10 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, getNamespace(tappingOptions, kubernetesProvider)), podRegex)
restartTappers := func() {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil {
fmt.Printf("Error getting pods by regex: %s (%v,%+v)\n", err, err, err)
cancel()
} else {
@@ -215,7 +162,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err)
cancel()
}
@@ -268,22 +215,12 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var portForwardCreateError error
if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
var err error
portForward, err = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel)
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
if err != nil {
fmt.Printf("error forwarding port to pod %s\n", err)
cancel()
} else {
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
if tappingOptions.Analyze {
token, _ := CreateAnonymousToken(tappingOptions.AnalyzeDestination)
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?token=%s&model=%s&dest=%s", tappingOptions.GuiPort, token.Token, token.Model, tappingOptions.AnalyzeDestination)); err != nil {
fmt.Println(err)
} else {
fmt.Println("Staring to upload and analyze the data, it may take a few minutes")
fmt.Println("https://" + tappingOptions.AnalyzeDestination + "/share/" + token.Token)
}
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/up9inc/mizu/cli/mizu"
"path/filepath"
"regexp"
@@ -102,6 +103,7 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
},
DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64),
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
},
}
//define the service account only when it exists to prevent pod crash
@@ -214,117 +216,30 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string,
}
func (provider *Provider) RemovePod(ctx context.Context, namespace string, podName string) error {
if isFound, err := provider.CheckPodExists(ctx, namespace, podName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Pods(namespace).Delete(ctx, podName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveService(ctx context.Context, namespace string, serviceName string) error {
if isFound, err := provider.CheckServiceExists(ctx, namespace, serviceName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.CoreV1().Services(namespace).Delete(ctx, serviceName, metav1.DeleteOptions{})
}
func (provider *Provider) RemoveDaemonSet(ctx context.Context, namespace string, daemonSetName string) error {
if isFound, err := provider.CheckDaemonSetExists(ctx, namespace, daemonSetName);
err != nil {
return err
} else if !isFound {
return nil
}
return provider.clientSet.AppsV1().DaemonSets(namespace).Delete(ctx, daemonSetName, metav1.DeleteOptions{})
}
func (provider *Provider) CheckPodExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckServiceExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.CoreV1().Services(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace string, name string) (bool, error) {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
Limit: 1,
}
resourceList, err := provider.clientSet.AppsV1().DaemonSets(namespace).List(ctx, listOptions)
if err != nil {
return false, err
}
if len(resourceList.Items) > 0 {
return true, nil
}
return false, nil
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool, tapOutgoing bool) error {
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, aggregatorPodIp string, nodeToTappedPodIPMap map[string][]string, linkServiceAccount bool) error {
nodeToTappedPodIPMapJsonStr, err := json.Marshal(nodeToTappedPodIPMap)
if err != nil {
return err
}
mizuCmd := []string{
"./mizuagent",
"-i", "any",
"--tap",
"--hardump",
"--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp),
}
if tapOutgoing {
mizuCmd = append(mizuCmd, "--anydirection")
}
privileged := true
agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage)
agentContainer.WithImagePullPolicy(core.PullAlways)
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
agentContainer.WithCommand(mizuCmd...)
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
@@ -386,8 +301,8 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(mizu.K8sAllNamespaces).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

View File

@@ -3,7 +3,7 @@ package shared
type WebSocketMessageType string
const (
WebSocketMessageTypeEntry WebSocketMessageType = "entry"
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
WebSocketMessageTypeTappedEntry WebSocketMessageType = "tappedEntry"
WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
)

View File

@@ -1,12 +0,0 @@
module github.com/up9inc/mizu/tap
go 1.16
require (
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
)

View File

@@ -1,31 +0,0 @@
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -1,122 +0,0 @@
package tap
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/orcaman/concurrent-map"
)
type requestResponsePair struct {
Request httpMessage `json:"request"`
Response httpMessage `json:"response"`
}
type httpMessage struct {
isRequest bool
captureTime time.Time
orig interface{}
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *requestResponsePair {
split := splitIdent(ident)
key := genKey(split)
requestHTTPMessage := httpMessage{
isRequest: true,
captureTime: captureTime,
orig: request,
}
if response, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
responseHTTPMessage := response.(*httpMessage)
if responseHTTPMessage.isRequest {
SilentError("Request-Duplicate", "Got duplicate request with same identifier")
return nil
}
Debug("Matched open Response for %s", key)
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &requestHTTPMessage)
Debug("Registered open Request for %s", key)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *requestResponsePair {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := httpMessage{
isRequest: false,
captureTime: captureTime,
orig: response,
}
if request, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
requestHTTPMessage := request.(*httpMessage)
if !requestHTTPMessage.isRequest {
SilentError("Response-Duplicate", "Got duplicate response with same identifier")
return nil
}
Debug("Matched open Request for %s", key)
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &responseHTTPMessage)
Debug("Registered open Response for %s", key)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *requestResponsePair {
return &requestResponsePair{
Request: *requestHTTPMessage,
Response: *responseHTTPMessage,
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}
func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int {
keysToPop := make([]string, 0)
for item := range matcher.openMessagesMap.IterBuffered() {
// Map only contains values of type httpMessage
message, _ := item.Val.(*httpMessage)
if message.captureTime.Before(t) {
keysToPop = append(keysToPop, item.Key)
}
}
numDeleted := len(keysToPop)
for _, key := range keysToPop {
_, _ = matcher.openMessagesMap.Pop(key)
}
return numDeleted
}

View File

@@ -1,29 +0,0 @@
package tap
type OutboundLink struct {
Src string
DstIP string
DstPort int
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
return &OutboundLinkWriter{
OutChan: make(chan *OutboundLink),
}
}
type OutboundLinkWriter struct {
OutChan chan *OutboundLink
}
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
}
}
func (olw *OutboundLinkWriter) Stop() {
close(olw.OutChan)
}

View File

@@ -1,31 +0,0 @@
package tap
type globalSettings struct {
filterPorts []int
filterAuthorities []string
}
var gSettings = &globalSettings{
filterPorts: []int{},
filterAuthorities: []string{},
}
func SetFilterPorts(ports []int) {
gSettings.filterPorts = ports
}
func GetFilterPorts() []int {
ports := make([]int, len(gSettings.filterPorts))
copy(ports, gSettings.filterPorts)
return ports
}
func SetFilterAuthorities(ipAddresses []string) {
gSettings.filterAuthorities = ipAddresses
}
func GetFilterIPs() []string {
addresses := make([]string, len(gSettings.filterAuthorities))
copy(addresses, gSettings.filterAuthorities)
return addresses
}

View File

@@ -1,13 +1,7 @@
import React from "react";
import styles from './style/HarEntry.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "./StatusCode";
import StatusCode from "./StatusCode";
import {EndpointPath} from "./EndpointPath";
import ingoingIconSuccess from "./assets/ingoing-traffic-success.svg"
import ingoingIconFailure from "./assets/ingoing-traffic-failure.svg"
import ingoingIconNeutral from "./assets/ingoing-traffic-neutral.svg"
import outgoingIconSuccess from "./assets/outgoing-traffic-success.svg"
import outgoingIconFailure from "./assets/outgoing-traffic-failure.svg"
import outgoingIconNeutral from "./assets/outgoing-traffic-neutral.svg"
interface HAREntry {
method?: string,
@@ -18,7 +12,6 @@ interface HAREntry {
url?: string;
isCurrentRevision?: boolean;
timestamp: Date;
isOutgoing?: boolean;
}
interface HAREntryProps {
@@ -28,26 +21,6 @@ interface HAREntryProps {
}
export const HarEntry: React.FC<HAREntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
const classification = getClassification(entry.statusCode)
let ingoingIcon;
let outgoingIcon;
switch(classification) {
case StatusCodeClassification.SUCCESS: {
ingoingIcon = ingoingIconSuccess;
outgoingIcon = outgoingIconSuccess;
break;
}
case StatusCodeClassification.FAILURE: {
ingoingIcon = ingoingIconFailure;
outgoingIcon = outgoingIconFailure;
break;
}
case StatusCodeClassification.NEUTRAL: {
ingoingIcon = ingoingIconNeutral;
outgoingIcon = outgoingIconNeutral;
break;
}
}
return <>
<div id={entry.id} className={`${styles.row} ${isSelected ? styles.rowSelected : ''}`} onClick={() => setFocusedEntryId(entry.id)}>
@@ -60,14 +33,7 @@ export const HarEntry: React.FC<HAREntryProps> = ({entry, setFocusedEntryId, isS
{entry.service}
</div>
</div>
<div className={styles.directionContainer}>
{entry.isOutgoing ?
<img src={outgoingIcon} alt="outgoing traffic" title="outgoing"/>
:
<img src={ingoingIcon} alt="ingoing traffic" title="ingoing"/>
}
</div>
<div className={styles.timestamp}>{new Date(+entry.timestamp)?.toLocaleString()}</div>
</div>
</>
};
};

View File

@@ -1,7 +1,7 @@
import React from "react";
import styles from './style/StatusCode.module.sass';
export enum StatusCodeClassification {
enum StatusCodeClassification {
SUCCESS = "success",
FAILURE = "failure",
NEUTRAL = "neutral"
@@ -14,12 +14,6 @@ interface HAREntryProps {
const StatusCode: React.FC<HAREntryProps> = ({statusCode}) => {
const classification = getClassification(statusCode)
return <span className={`${styles[classification]} ${styles.base}`}>{statusCode}</span>
};
export function getClassification(statusCode: number): string {
let classification = StatusCodeClassification.NEUTRAL;
if (statusCode >= 200 && statusCode <= 399) {
@@ -28,7 +22,7 @@ export function getClassification(statusCode: number): string {
classification = StatusCodeClassification.FAILURE;
}
return classification
}
return <span className={`${styles[classification]} ${styles.base}`}>{statusCode}</span>
};
export default StatusCode;
export default StatusCode;

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="#EB5757"/>
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="#EB5757"/>
</svg>

Before

Width:  |  Height:  |  Size: 800 B

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="gray"/>
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="gray"/>
</svg>

Before

Width:  |  Height:  |  Size: 794 B

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M16.5175 11.1465C16.8392 10.8869 17 10.4434 17 10C17 9.55657 16.8392 9.11314 16.5175 8.85348L12.5425 5.64459C13.2682 5.23422 14.1067 5 15 5C17.7614 5 20 7.23858 20 10C20 12.7614 17.7614 15 15 15C14.1067 15 13.2682 14.7658 12.5425 14.3554L16.5175 11.1465Z" fill="#BCCEFD"/>
<path d="M16 10C16 10.3167 15.8749 10.6335 15.6247 10.8189L10.1706 14.8624C9.65543 15.2444 9 14.7858 9 14.0435V5.95652C9 5.21417 9.65543 4.75564 10.1706 5.13758L15.6247 9.18106C15.8749 9.36653 16 9.68326 16 10Z" fill="#27AE60"/>
<path d="M0 10C0 8.89543 0.895431 8 2 8H10C11.1046 8 12 8.89543 12 10C12 11.1046 11.1046 12 10 12H2C0.895431 12 0 11.1046 0 10Z" fill="#27AE60"/>
</svg>

Before

Width:  |  Height:  |  Size: 800 B

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="#EB5757"/>
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="#EB5757"/>
</svg>

Before

Width:  |  Height:  |  Size: 736 B

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="gray"/>
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="gray"/>
</svg>

Before

Width:  |  Height:  |  Size: 730 B

View File

@@ -1,5 +0,0 @@
<svg width="20" height="20" viewBox="0 0 20 20" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fill-rule="evenodd" clip-rule="evenodd" d="M15 15C17.7614 15 20 12.7615 20 10C20 7.23861 17.7614 5.00003 15 5.00003C13.3642 5.00003 11.9118 5.78558 10.9996 7.00003H14C15.6569 7.00003 17 8.34318 17 10C17 11.6569 15.6569 13 14 13H10.9996C11.9118 14.2145 13.3642 15 15 15Z" fill="#BCCEFD"/>
<rect x="4" y="8.00003" width="12" height="4" rx="2" fill="#27AE60"/>
<path d="M5.96244e-08 10C6.34015e-08 9.68329 0.125088 9.36656 0.375266 9.18109L5.82939 5.13761C6.34457 4.75567 7 5.2142 7 5.95655L7 14.0435C7 14.7859 6.34457 15.2444 5.82939 14.8625L0.375266 10.819C0.125088 10.6335 5.58474e-08 10.3168 5.96244e-08 10Z" fill="#27AE60"/>
</svg>

Before

Width:  |  Height:  |  Size: 736 B

View File

@@ -37,10 +37,9 @@
.timestamp
font-size: 12px
color: $secondary-font-color
padding-left: 12px
padding-left: 8px
padding-right: 8px
flex-shrink: 0
width: 145px
text-align: left
.endpointServiceContainer
display: flex
@@ -48,10 +47,4 @@
overflow: hidden
padding-right: 10px
padding-left: 10px
flex-grow: 1
.directionContainer
display: flex
border-right: 1px solid $data-background-color
padding: 4px
padding-right: 12px
flex-grow: 1

View File

@@ -19,4 +19,4 @@ $blue-gray: #494677;
successColor: $success-color;
failureColor: $failure-color;
blueGray: $blue-gray;
}
}