Compare commits

..

2 Commits

Author SHA1 Message Date
M. Mert Yıldıran
a42a0cd0b9 Improve request-response matching and fix some interface conversion errors (#778)
* Add a PCAP based testbed

* Fix typos

* Download PCAPs from the Google Cloud bucket

* Add a Python script to automate the PCAP testbed

* Dump the test suite into a file named `suite.json`

* Serialize entries separately

* Dissect individual TCP streams one by one through separate PCAP files

* Improve the reliability a little bit

* Ditch the individual TCP streams idea

* Fix some issues in Kafka

* Print the total number of packets and TCP streams

* Fix an interface conversion error in AMQP

* Print the total number of returning items from the dissectors

* Print the total number of returning items from the dissectors really

* Fix a possible race condition

* Do atomic increments just to be sure

* Print the total number of Redis `Dissect` calls

* Improve the request-response matching in Redis by including the TCP stream ID

* Update the request-response pair matching key format in HTTP and Kafka

* Rearrange the test suite

* Add more queries to the test suite

* Remove the debug prints

* Add the assertions

* Close the WebSocket connection faster

* Make `MIZU_TEST` enviroment variable a shared constant

* Add `test-lint` rule

* Fix several issues in Kafka

* Update the test suite

* Add more queries

* Fix the `test-lint` rule

* Exit only after PCAP EOF

* Add more queries

* Update `suite.json`

* Make the tests more stable

* Revert the bad changes

* Run `go mod tidy` on `tap/`
2022-02-08 21:32:27 +03:00
RamiBerm
145004fe43 TRA-4263 revert extensible routing (#774)
* Update main.go, extensions.go, and 2 more files...

* Update config_routes.go, entries_routes.go, and 7 more files...
2022-02-08 17:48:30 +02:00
23 changed files with 327 additions and 417 deletions

View File

@@ -5,13 +5,24 @@ import (
"errors"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/routes"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/agent/pkg/api"
@@ -35,6 +46,7 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP
var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)")
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
var startTime int64
const (
socketConnectionRetries = 30
@@ -60,7 +72,7 @@ func main() {
} else if *tapperMode {
runInTapperMode()
} else if *apiServerMode {
utils.StartServer(app.RunInApiServerMode(*namespace))
utils.StartServer(runInApiServerMode(*namespace))
} else if *harsReaderMode {
runInHarReaderMode()
}
@@ -72,6 +84,77 @@ func main() {
logger.Log.Info("Exiting")
}
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
app := gin.Default()
app.GET("/echo", func(c *gin.Context) {
c.String(http.StatusOK, "Here is Mizu agent")
})
eventHandlers := api.RoutesEventHandlers{
SocketOutChannel: socketHarOutputChannel,
}
app.Use(disableRootStaticCache())
var staticFolder string
if config.Config.StandaloneMode {
staticFolder = "./site-standalone"
} else {
staticFolder = "./site"
}
indexStaticFile := staticFolder + "/index.html"
if err := setUIFlags(indexStaticFile); err != nil {
logger.Log.Errorf("Error setting ui flags, err: %v", err)
}
app.Use(static.ServeRoot("/", staticFolder))
app.NoRoute(func(c *gin.Context) {
c.File(indexStaticFile)
})
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers, startTime)
if config.Config.StandaloneMode {
routes.ConfigRoutes(app)
routes.UserRoutes(app)
routes.InstallRoutes(app)
}
if config.Config.OAS {
routes.OASRoutes(app)
}
if config.Config.ServiceMap {
routes.ServiceMapRoutes(app)
}
routes.QueryRoutes(app)
routes.EntriesRoutes(app)
routes.MetadataRoutes(app)
routes.StatusRoutes(app)
return app
}
func runInApiServerMode(namespace string) *gin.Engine {
app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort)
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(namespace)
enableExpFeatureIfNeeded()
syncEntriesConfig := getSyncEntriesConfig()
if syncEntriesConfig != nil {
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
logger.Log.Error("Error syncing entries, err: %v", err)
}
}
return hostApi(app.GetEntryInputChannel())
}
func runInTapperMode() {
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
if *apiServerAddress == "" {
@@ -113,7 +196,7 @@ func runInStandaloneMode() {
go app.FilterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, app.ExtensionsMap)
ginApp := app.HostApi(nil)
ginApp := hostApi(nil)
utils.StartServer(ginApp)
}
@@ -123,10 +206,63 @@ func runInHarReaderMode() {
go app.FilterItems(outputItemsChannel, filteredHarChannel)
go api.StartReadingEntries(filteredHarChannel, harsDir, app.ExtensionsMap)
ginApp := app.HostApi(nil)
ginApp := hostApi(nil)
utils.StartServer(ginApp)
}
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oas.GetOasGeneratorInstance().Start()
}
if config.Config.ServiceMap {
servicemap.GetInstance().SetConfig(config.Config)
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
if syncEntriesConfigJson == "" {
return nil
}
var syncEntriesConfig = &shared.SyncEntriesConfig{}
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
}
return syncEntriesConfig
}
func disableRootStaticCache() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.RequestURI == "/" {
// Disable cache only for the main static route
c.Writer.Header().Set("Cache-Control", "no-store")
}
c.Next()
}
}
func setUIFlags(uiIndexPath string) error {
read, err := ioutil.ReadFile(uiIndexPath)
if err != nil {
return err
}
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
if err != nil {
return err
}
return nil
}
func parseEnvVar(env string) map[string][]v1.Pod {
var mapOfList map[string][]v1.Pod

View File

@@ -1,12 +1,18 @@
package app
import (
"fmt"
"sort"
"time"
"github.com/antelman107/net-wait-go/wait"
"github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
@@ -60,3 +66,51 @@ func LoadExtensions() {
controllers.InitExtensionsMap(ExtensionsMap)
}
func ConfigureBasenineServer(host string, port string) {
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(5*time.Second),
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available!")
}
// Limit the database size to default 200MB
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
if err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err)
}
// Define the macros
for _, extension := range Extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
err = basenine.Macro(host, port, macro, expanded)
if err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
}
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
return outputItemsChannel
}
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
outChannel <- message
}
}

View File

@@ -1,210 +0,0 @@
package app
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/antelman107/net-wait-go/wait"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/routes"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)
var (
ConfigRoutes *gin.RouterGroup
UserRoutes *gin.RouterGroup
InstallRoutes *gin.RouterGroup
OASRoutes *gin.RouterGroup
ServiceMapRoutes *gin.RouterGroup
QueryRoutes *gin.RouterGroup
EntriesRoutes *gin.RouterGroup
MetadataRoutes *gin.RouterGroup
StatusRoutes *gin.RouterGroup
startTime int64
)
func HostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
app := gin.Default()
app.GET("/echo", func(c *gin.Context) {
c.String(http.StatusOK, "Here is Mizu agent")
})
eventHandlers := api.RoutesEventHandlers{
SocketOutChannel: socketHarOutputChannel,
}
app.Use(disableRootStaticCache())
var staticFolder string
if config.Config.StandaloneMode {
staticFolder = "./site-standalone"
} else {
staticFolder = "./site"
}
indexStaticFile := staticFolder + "/index.html"
if err := setUIFlags(indexStaticFile); err != nil {
logger.Log.Errorf("Error setting ui flags, err: %v", err)
}
app.Use(static.ServeRoot("/", staticFolder))
app.NoRoute(func(c *gin.Context) {
c.File(indexStaticFile)
})
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers, startTime)
if config.Config.StandaloneMode {
ConfigRoutes = routes.ConfigRoutes(app)
UserRoutes = routes.UserRoutes(app)
InstallRoutes = routes.InstallRoutes(app)
}
if config.Config.OAS {
OASRoutes = routes.OASRoutes(app)
}
if config.Config.ServiceMap {
ServiceMapRoutes = routes.ServiceMapRoutes(app)
}
QueryRoutes = routes.QueryRoutes(app)
EntriesRoutes = routes.EntriesRoutes(app)
MetadataRoutes = routes.MetadataRoutes(app)
StatusRoutes = routes.StatusRoutes(app)
return app
}
func RunInApiServerMode(namespace string) *gin.Engine {
configureBasenineServer(shared.BasenineHost, shared.BaseninePort)
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
enableExpFeatureIfNeeded()
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
syncEntriesConfig := getSyncEntriesConfig()
if syncEntriesConfig != nil {
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
logger.Log.Error("Error syncing entries, err: %v", err)
}
}
return HostApi(outputItemsChannel)
}
func configureBasenineServer(host string, port string) {
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(5*time.Second),
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available!")
}
// Limit the database size to default 200MB
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
if err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err)
}
// Define the macros
for _, extension := range Extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
err = basenine.Macro(host, port, macro, expanded)
if err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
}
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
if syncEntriesConfigJson == "" {
return nil
}
var syncEntriesConfig = &shared.SyncEntriesConfig{}
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
}
return syncEntriesConfig
}
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
outChannel <- message
}
}
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oas.GetOasGeneratorInstance().Start()
}
if config.Config.ServiceMap {
servicemap.GetInstance().SetConfig(config.Config)
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
func disableRootStaticCache() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.RequestURI == "/" {
// Disable cache only for the main static route
c.Writer.Header().Set("Cache-Control", "no-store")
}
c.Next()
}
}
func setUIFlags(uiIndexPath string) error {
read, err := ioutil.ReadFile(uiIndexPath)
if err != nil {
return err
}
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
if err != nil {
return err
}
return nil
}

View File

@@ -7,17 +7,10 @@ import (
"github.com/gin-gonic/gin"
)
var (
ConfigPostTapConfigHandler = controllers.PostTapConfig
ConfigGetTapConfigHandler = controllers.GetTapConfig
)
func ConfigRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func ConfigRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/config")
routeGroup.Use(middlewares.RequiresAuth())
routeGroup.POST("/tap", middlewares.RequiresAdmin(), func(c *gin.Context) { ConfigPostTapConfigHandler(c) })
routeGroup.GET("/tap", func(c *gin.Context) { ConfigGetTapConfigHandler(c) })
return routeGroup
routeGroup.POST("/tap", middlewares.RequiresAdmin(), controllers.PostTapConfig)
routeGroup.GET("/tap", controllers.GetTapConfig)
}

View File

@@ -7,18 +7,11 @@ import (
"github.com/gin-gonic/gin"
)
var (
EntriesGetHandler = controllers.GetEntries
EntriesGetSingleHandler = controllers.GetEntry
)
// EntriesRoutes defines the group of har entries routes.
func EntriesRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func EntriesRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/entries")
routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/", func(c *gin.Context) { EntriesGetHandler(c) }) // get entries (base/thin entries) and metadata
routeGroup.GET("/:id", func(c *gin.Context) { EntriesGetSingleHandler(c) }) // get single (full) entry
return routeGroup
routeGroup.GET("/", controllers.GetEntries) // get entries (base/thin entries) and metadata
routeGroup.GET("/:id", controllers.GetEntry) // get single (full) entry
}

View File

@@ -6,16 +6,9 @@ import (
"github.com/gin-gonic/gin"
)
var (
InstallGetIsNeededHandler = controllers.IsSetupNecessary
InstallPostAdminHandler = controllers.SetupAdminUser
)
func InstallRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func InstallRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/install")
routeGroup.GET("/isNeeded", func(c *gin.Context) { InstallGetIsNeededHandler(c) })
routeGroup.POST("/admin", func(c *gin.Context) { InstallPostAdminHandler(c) })
return routeGroup
routeGroup.GET("/isNeeded", controllers.IsSetupNecessary)
routeGroup.POST("/admin", controllers.SetupAdminUser)
}

View File

@@ -6,15 +6,9 @@ import (
"github.com/gin-gonic/gin"
)
var (
MetadataGetVersionHandler = controllers.GetVersion
)
// MetadataRoutes defines the group of metadata routes.
func MetadataRoutes(app *gin.Engine) *gin.RouterGroup {
func MetadataRoutes(app *gin.Engine) {
routeGroup := app.Group("/metadata")
routeGroup.GET("/version", func(c *gin.Context) { MetadataGetVersionHandler(c) })
return routeGroup
routeGroup.GET("/version", controllers.GetVersion)
}

View File

@@ -7,20 +7,12 @@ import (
"github.com/gin-gonic/gin"
)
var (
OASGetServersHandler = controllers.GetOASServers
OASGetAllSpecsHandler = controllers.GetOASAllSpecs
OASGetSingleSpecHandler = controllers.GetOASSpec
)
// OASRoutes methods to access OAS spec
func OASRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func OASRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/oas")
routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/", func(c *gin.Context) { OASGetServersHandler(c) }) // list of servers in OAS map
routeGroup.GET("/all", func(c *gin.Context) { OASGetAllSpecsHandler(c) }) // list of servers in OAS map
routeGroup.GET("/:id", func(c *gin.Context) { OASGetSingleSpecHandler(c) }) // get OAS spec for given server
return routeGroup
routeGroup.GET("/", controllers.GetOASServers) // list of servers in OAS map
routeGroup.GET("/all", controllers.GetOASAllSpecs) // list of servers in OAS map
routeGroup.GET("/:id", controllers.GetOASSpec) // get OAS spec for given server
}

View File

@@ -7,15 +7,9 @@ import (
"github.com/gin-gonic/gin"
)
var (
QueryPostValidateHandler = controllers.PostValidate
)
func QueryRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func QueryRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/query")
routeGroup.Use(middlewares.RequiresAuth())
routeGroup.POST("/validate", func(c *gin.Context) { QueryPostValidateHandler(c) })
return routeGroup
routeGroup.POST("/validate", controllers.PostValidate)
}

View File

@@ -7,25 +7,13 @@ import (
"github.com/gin-gonic/gin"
)
var (
ServiceMapGetStatus gin.HandlerFunc
ServiceMapGet gin.HandlerFunc
ServiceMapReset gin.HandlerFunc
)
func ServiceMapRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func ServiceMapRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/servicemap")
routeGroup.Use(middlewares.RequiresAuth())
controller := controllers.NewServiceMapController()
ServiceMapGetStatus = controller.Status
ServiceMapGet = controller.Get
ServiceMapReset = controller.Reset
routeGroup.GET("/status", func(c *gin.Context) { ServiceMapGetStatus(c) })
routeGroup.GET("/get", func(c *gin.Context) { ServiceMapGet(c) })
routeGroup.GET("/reset", func(c *gin.Context) { ServiceMapReset(c) })
return routeGroup
routeGroup.GET("/status", controller.Status)
routeGroup.GET("/get", controller.Get)
routeGroup.GET("/reset", controller.Reset)
}

View File

@@ -7,39 +7,24 @@ import (
"github.com/gin-gonic/gin"
)
var (
StatusGetHealthCheck = controllers.HealthCheck
StatusPostTappedPods = controllers.PostTappedPods
StatusPostTapperStatus = controllers.PostTapperStatus
StatusGetConnectedTappersCount = controllers.GetConnectedTappersCount
StatusGetTappingStatus = controllers.GetTappingStatus
StatusGetAuthStatus = controllers.GetAuthStatus
StatusGetAnalyzeInformation = controllers.AnalyzeInformation
StatusGetGeneralStats = controllers.GetGeneralStats
StatusGetRecentTLSLinks = controllers.GetRecentTLSLinks
StatusGetCurrentResolvingInformation = controllers.GetCurrentResolvingInformation
)
func StatusRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func StatusRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/status")
routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/health", func(c *gin.Context) { StatusGetHealthCheck(c) })
routeGroup.GET("/health", controllers.HealthCheck)
routeGroup.POST("/tappedPods", func(c *gin.Context) { StatusPostTappedPods(c) })
routeGroup.POST("/tapperStatus", func(c *gin.Context) { StatusPostTapperStatus(c) })
routeGroup.GET("/connectedTappersCount", func(c *gin.Context) { StatusGetConnectedTappersCount(c) })
routeGroup.GET("/tap", func(c *gin.Context) { StatusGetTappingStatus(c) })
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
routeGroup.POST("/tapperStatus", controllers.PostTapperStatus)
routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount)
routeGroup.GET("/tap", controllers.GetTappingStatus)
routeGroup.GET("/auth", func(c *gin.Context) { StatusGetAuthStatus(c) })
routeGroup.GET("/auth", controllers.GetAuthStatus)
routeGroup.GET("/analyze", func(c *gin.Context) { StatusGetAnalyzeInformation(c) })
routeGroup.GET("/analyze", controllers.AnalyzeInformation)
routeGroup.GET("/general", func(c *gin.Context) { StatusGetGeneralStats(c) }) // get general stats about entries in DB
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.GET("/recentTLSLinks", func(c *gin.Context) { StatusGetRecentTLSLinks(c) })
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
routeGroup.GET("/resolving", func(c *gin.Context) { StatusGetCurrentResolvingInformation(c) })
return routeGroup
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
}

View File

@@ -6,18 +6,10 @@ import (
"github.com/gin-gonic/gin"
)
var (
UserPostLogin = controllers.Login
UserPostLogout = controllers.Logout
UserPostRegister = controllers.Register
)
func UserRoutes(ginApp *gin.Engine) *gin.RouterGroup {
func UserRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/user")
routeGroup.POST("/login", func(c *gin.Context) { UserPostLogin(c) })
routeGroup.POST("/logout", func(c *gin.Context) { UserPostLogout(c) })
routeGroup.POST("/register", func(c *gin.Context) { UserPostRegister(c) })
return routeGroup
routeGroup.POST("/login", controllers.Login)
routeGroup.POST("/logout", controllers.Logout)
routeGroup.POST("/register", controllers.Register)
}

View File

@@ -58,8 +58,10 @@ type TcpID struct {
}
type CounterPair struct {
StreamId int64
Request uint
Response uint
sync.Mutex
}
type GenericMessage struct {

View File

@@ -362,7 +362,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
for name, value := range properties["headers"].(map[string]interface{}) {
headers = append(headers, api.TableData{
Name: name,
Value: value.(string),
Value: value,
Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name),
})
}

View File

@@ -115,7 +115,10 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
if err != nil {
return
}
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
@@ -127,12 +130,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
requestCounter,
"HTTP1",
)
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
@@ -155,7 +159,10 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
if err != nil {
return
}
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
@@ -167,12 +174,13 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
ident := fmt.Sprintf(
"%s->%s %s->%s %d %s",
"%d_%s:%s_%s:%s_%d_%s",
counterPair.StreamId,
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
responseCounter,
"HTTP1",
)
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor)

View File

@@ -1,9 +1,7 @@
package http
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
@@ -23,9 +21,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
requestHTTPMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@@ -35,7 +30,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseHTTPMessage := response.(*api.GenericMessage)
if responseHTTPMessage.IsRequest {
@@ -44,14 +39,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor)
}
matcher.openMessagesMap.Store(key, &requestHTTPMessage)
matcher.openMessagesMap.Store(ident, &requestHTTPMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
@@ -61,7 +53,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestHTTPMessage := request.(*api.GenericMessage)
if !requestHTTPMessage.IsRequest {
@@ -70,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor)
}
matcher.openMessagesMap.Store(key, &responseHTTPMessage)
matcher.openMessagesMap.Store(ident, &responseHTTPMessage)
return nil
}
@@ -89,13 +81,3 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener
},
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
return key
}

View File

@@ -368,24 +368,26 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
}
recordsResults := recordsPath.Get(obj)
if len(recordsResults) > 0 {
records := recordsResults[0].([]interface{})
for i, _record := range records {
record := _record.(map[string]interface{})
value := record["value"]
delete(record, "value")
if recordsResults[0] != nil {
records := recordsResults[0].([]interface{})
for i, _record := range records {
record := _record.(map[string]interface{})
value := record["value"]
delete(record, "value")
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Record [%d] Value", i),
Data: value.(string),
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Record [%d] Value", i),
Data: value.(string),
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
})
}
}
}
}
@@ -614,22 +616,24 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
})
for k, _record := range recordBatch["record"].([]interface{}) {
record := _record.(map[string]interface{})
value := record["value"]
if recordBatch["record"] != nil {
for k, _record := range recordBatch["record"].([]interface{}) {
record := _record.(map[string]interface{})
value := record["value"]
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.TABLE,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
Data: value.(string),
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
})
rep = append(rep, api.SectionData{
Type: api.BODY,
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
Data: value.(string),
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
})
}
}
}
}
@@ -730,6 +734,9 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
Data: string(repPayload),
})
if payload["topics"] == nil {
return rep
}
for i, _topic := range payload["topics"].([]interface{}) {
topic := _topic.(map[string]interface{})
@@ -766,6 +773,9 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
Data: string(repPayload),
})
if payload["topics"] == nil {
return rep
}
for i, _topic := range payload["topics"].([]interface{}) {
topic := _topic.(map[string]interface{})

View File

@@ -47,13 +47,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
_, _, err := ReadRequest(b, tcpID, superTimer)
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, superTimer, emitter)
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
if err != nil {
return err
}
@@ -120,7 +120,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
summary = summary[:len(summary)-2]
}
case CreateTopics:
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
@@ -128,6 +132,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
summary = summary[:len(summary)-2]
}
case DeleteTopics:
if reqDetails["topicNames"] == nil {
break
}
topicNames := reqDetails["topicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)

View File

@@ -19,7 +19,7 @@ type Request struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -214,7 +214,8 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.SrcPort,
tcpID.DstIP,

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -44,7 +44,8 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
key := fmt.Sprintf(
"%s:%s->%s:%s::%d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.DstIP,
tcpID.DstPort,
tcpID.SrcIP,

View File

@@ -7,15 +7,21 @@ import (
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
counterPair.Unlock()
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.SrcIP,
tcpID.DstIP,
tcpID.SrcPort,
tcpID.DstPort,
counterPair.Request,
requestCounter,
)
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
@@ -31,15 +37,21 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
counterPair.Unlock()
ident := fmt.Sprintf(
"%s->%s %s->%s %d",
"%d_%s:%s_%s:%s_%d",
counterPair.StreamId,
tcpID.DstIP,
tcpID.SrcIP,
tcpID.DstPort,
tcpID.SrcPort,
counterPair.Response,
responseCounter,
)
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{

View File

@@ -1,8 +1,6 @@
package redis
import (
"fmt"
"strings"
"sync"
"time"
@@ -11,7 +9,7 @@ import (
var reqResMatcher = createResponseRequestMatcher() // global
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
type requestResponseMatcher struct {
openMessagesMap *sync.Map
}
@@ -22,9 +20,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
requestRedisMessage := api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@@ -37,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
},
}
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
responseRedisMessage := response.(*api.GenericMessage)
if responseRedisMessage.IsRequest {
@@ -46,14 +41,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &requestRedisMessage)
matcher.openMessagesMap.Store(ident, &requestRedisMessage)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
split := splitIdent(ident)
key := genKey(split)
responseRedisMessage := api.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
@@ -66,7 +58,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
},
}
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
requestRedisMessage := request.(*api.GenericMessage)
if !requestRedisMessage.IsRequest {
@@ -75,7 +67,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
}
matcher.openMessagesMap.Store(key, &responseRedisMessage)
matcher.openMessagesMap.Store(ident, &responseRedisMessage)
return nil
}
@@ -90,13 +82,3 @@ func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.Gene
},
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}

View File

@@ -82,6 +82,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
counterPair := &api.CounterPair{
StreamId: stream.id,
Request: 0,
Response: 0,
}