mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-10 05:31:36 +00:00
Compare commits
2 Commits
26.0-dev11
...
26.0-dev13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a42a0cd0b9 | ||
|
|
145004fe43 |
142
agent/main.go
142
agent/main.go
@@ -5,13 +5,24 @@ import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
"github.com/up9inc/mizu/agent/pkg/routes"
|
||||
"github.com/up9inc/mizu/agent/pkg/servicemap"
|
||||
"github.com/up9inc/mizu/agent/pkg/up9"
|
||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
@@ -35,6 +46,7 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP
|
||||
var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)")
|
||||
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
|
||||
var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
|
||||
var startTime int64
|
||||
|
||||
const (
|
||||
socketConnectionRetries = 30
|
||||
@@ -60,7 +72,7 @@ func main() {
|
||||
} else if *tapperMode {
|
||||
runInTapperMode()
|
||||
} else if *apiServerMode {
|
||||
utils.StartServer(app.RunInApiServerMode(*namespace))
|
||||
utils.StartServer(runInApiServerMode(*namespace))
|
||||
} else if *harsReaderMode {
|
||||
runInHarReaderMode()
|
||||
}
|
||||
@@ -72,6 +84,77 @@ func main() {
|
||||
logger.Log.Info("Exiting")
|
||||
}
|
||||
|
||||
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
|
||||
app := gin.Default()
|
||||
|
||||
app.GET("/echo", func(c *gin.Context) {
|
||||
c.String(http.StatusOK, "Here is Mizu agent")
|
||||
})
|
||||
|
||||
eventHandlers := api.RoutesEventHandlers{
|
||||
SocketOutChannel: socketHarOutputChannel,
|
||||
}
|
||||
|
||||
app.Use(disableRootStaticCache())
|
||||
|
||||
var staticFolder string
|
||||
if config.Config.StandaloneMode {
|
||||
staticFolder = "./site-standalone"
|
||||
} else {
|
||||
staticFolder = "./site"
|
||||
}
|
||||
|
||||
indexStaticFile := staticFolder + "/index.html"
|
||||
if err := setUIFlags(indexStaticFile); err != nil {
|
||||
logger.Log.Errorf("Error setting ui flags, err: %v", err)
|
||||
}
|
||||
|
||||
app.Use(static.ServeRoot("/", staticFolder))
|
||||
app.NoRoute(func(c *gin.Context) {
|
||||
c.File(indexStaticFile)
|
||||
})
|
||||
|
||||
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
|
||||
|
||||
api.WebSocketRoutes(app, &eventHandlers, startTime)
|
||||
|
||||
if config.Config.StandaloneMode {
|
||||
routes.ConfigRoutes(app)
|
||||
routes.UserRoutes(app)
|
||||
routes.InstallRoutes(app)
|
||||
}
|
||||
if config.Config.OAS {
|
||||
routes.OASRoutes(app)
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
routes.ServiceMapRoutes(app)
|
||||
}
|
||||
|
||||
routes.QueryRoutes(app)
|
||||
routes.EntriesRoutes(app)
|
||||
routes.MetadataRoutes(app)
|
||||
routes.StatusRoutes(app)
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func runInApiServerMode(namespace string) *gin.Engine {
|
||||
app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort)
|
||||
startTime = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
api.StartResolving(namespace)
|
||||
|
||||
enableExpFeatureIfNeeded()
|
||||
|
||||
syncEntriesConfig := getSyncEntriesConfig()
|
||||
if syncEntriesConfig != nil {
|
||||
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
|
||||
logger.Log.Error("Error syncing entries, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return hostApi(app.GetEntryInputChannel())
|
||||
}
|
||||
|
||||
func runInTapperMode() {
|
||||
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
if *apiServerAddress == "" {
|
||||
@@ -113,7 +196,7 @@ func runInStandaloneMode() {
|
||||
go app.FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, app.ExtensionsMap)
|
||||
|
||||
ginApp := app.HostApi(nil)
|
||||
ginApp := hostApi(nil)
|
||||
utils.StartServer(ginApp)
|
||||
}
|
||||
|
||||
@@ -123,10 +206,63 @@ func runInHarReaderMode() {
|
||||
|
||||
go app.FilterItems(outputItemsChannel, filteredHarChannel)
|
||||
go api.StartReadingEntries(filteredHarChannel, harsDir, app.ExtensionsMap)
|
||||
ginApp := app.HostApi(nil)
|
||||
ginApp := hostApi(nil)
|
||||
utils.StartServer(ginApp)
|
||||
}
|
||||
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
servicemap.GetInstance().SetConfig(config.Config)
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
|
||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
|
||||
if syncEntriesConfigJson == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var syncEntriesConfig = &shared.SyncEntriesConfig{}
|
||||
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
|
||||
}
|
||||
|
||||
return syncEntriesConfig
|
||||
}
|
||||
|
||||
func disableRootStaticCache() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if c.Request.RequestURI == "/" {
|
||||
// Disable cache only for the main static route
|
||||
c.Writer.Header().Set("Cache-Control", "no-store")
|
||||
}
|
||||
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func setUIFlags(uiIndexPath string) error {
|
||||
read, err := ioutil.ReadFile(uiIndexPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
|
||||
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
|
||||
|
||||
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseEnvVar(env string) map[string][]v1.Pod {
|
||||
var mapOfList map[string][]v1.Pod
|
||||
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/antelman107/net-wait-go/wait"
|
||||
"github.com/op/go-logging"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
"github.com/up9inc/mizu/agent/pkg/controllers"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
|
||||
httpExt "github.com/up9inc/mizu/tap/extensions/http"
|
||||
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
|
||||
@@ -60,3 +66,51 @@ func LoadExtensions() {
|
||||
|
||||
controllers.InitExtensionsMap(ExtensionsMap)
|
||||
}
|
||||
|
||||
func ConfigureBasenineServer(host string, port string) {
|
||||
if !wait.New(
|
||||
wait.WithProto("tcp"),
|
||||
wait.WithWait(200*time.Millisecond),
|
||||
wait.WithBreak(50*time.Millisecond),
|
||||
wait.WithDeadline(5*time.Second),
|
||||
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
|
||||
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
|
||||
logger.Log.Panicf("Basenine is not available!")
|
||||
}
|
||||
|
||||
// Limit the database size to default 200MB
|
||||
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while limiting database size: %v", err)
|
||||
}
|
||||
|
||||
// Define the macros
|
||||
for _, extension := range Extensions {
|
||||
macros := extension.Dissector.Macros()
|
||||
for macro, expanded := range macros {
|
||||
err = basenine.Macro(host, port, macro, expanded)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while adding a macro: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetEntryInputChannel() chan *tapApi.OutputChannelItem {
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
|
||||
|
||||
return outputItemsChannel
|
||||
}
|
||||
|
||||
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
@@ -1,210 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/antelman107/net-wait-go/wait"
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/op/go-logging"
|
||||
basenine "github.com/up9inc/basenine/client/go"
|
||||
"github.com/up9inc/mizu/agent/pkg/api"
|
||||
"github.com/up9inc/mizu/agent/pkg/config"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
"github.com/up9inc/mizu/agent/pkg/routes"
|
||||
"github.com/up9inc/mizu/agent/pkg/servicemap"
|
||||
"github.com/up9inc/mizu/agent/pkg/up9"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var (
|
||||
ConfigRoutes *gin.RouterGroup
|
||||
UserRoutes *gin.RouterGroup
|
||||
InstallRoutes *gin.RouterGroup
|
||||
OASRoutes *gin.RouterGroup
|
||||
ServiceMapRoutes *gin.RouterGroup
|
||||
QueryRoutes *gin.RouterGroup
|
||||
EntriesRoutes *gin.RouterGroup
|
||||
MetadataRoutes *gin.RouterGroup
|
||||
StatusRoutes *gin.RouterGroup
|
||||
|
||||
startTime int64
|
||||
)
|
||||
|
||||
func HostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
|
||||
app := gin.Default()
|
||||
|
||||
app.GET("/echo", func(c *gin.Context) {
|
||||
c.String(http.StatusOK, "Here is Mizu agent")
|
||||
})
|
||||
|
||||
eventHandlers := api.RoutesEventHandlers{
|
||||
SocketOutChannel: socketHarOutputChannel,
|
||||
}
|
||||
|
||||
app.Use(disableRootStaticCache())
|
||||
|
||||
var staticFolder string
|
||||
if config.Config.StandaloneMode {
|
||||
staticFolder = "./site-standalone"
|
||||
} else {
|
||||
staticFolder = "./site"
|
||||
}
|
||||
|
||||
indexStaticFile := staticFolder + "/index.html"
|
||||
if err := setUIFlags(indexStaticFile); err != nil {
|
||||
logger.Log.Errorf("Error setting ui flags, err: %v", err)
|
||||
}
|
||||
|
||||
app.Use(static.ServeRoot("/", staticFolder))
|
||||
app.NoRoute(func(c *gin.Context) {
|
||||
c.File(indexStaticFile)
|
||||
})
|
||||
|
||||
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
|
||||
|
||||
api.WebSocketRoutes(app, &eventHandlers, startTime)
|
||||
|
||||
if config.Config.StandaloneMode {
|
||||
ConfigRoutes = routes.ConfigRoutes(app)
|
||||
UserRoutes = routes.UserRoutes(app)
|
||||
InstallRoutes = routes.InstallRoutes(app)
|
||||
}
|
||||
if config.Config.OAS {
|
||||
OASRoutes = routes.OASRoutes(app)
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
ServiceMapRoutes = routes.ServiceMapRoutes(app)
|
||||
}
|
||||
|
||||
QueryRoutes = routes.QueryRoutes(app)
|
||||
EntriesRoutes = routes.EntriesRoutes(app)
|
||||
MetadataRoutes = routes.MetadataRoutes(app)
|
||||
StatusRoutes = routes.StatusRoutes(app)
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func RunInApiServerMode(namespace string) *gin.Engine {
|
||||
configureBasenineServer(shared.BasenineHost, shared.BaseninePort)
|
||||
startTime = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
api.StartResolving(namespace)
|
||||
|
||||
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
enableExpFeatureIfNeeded()
|
||||
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
|
||||
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
|
||||
|
||||
syncEntriesConfig := getSyncEntriesConfig()
|
||||
if syncEntriesConfig != nil {
|
||||
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
|
||||
logger.Log.Error("Error syncing entries, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return HostApi(outputItemsChannel)
|
||||
}
|
||||
|
||||
func configureBasenineServer(host string, port string) {
|
||||
if !wait.New(
|
||||
wait.WithProto("tcp"),
|
||||
wait.WithWait(200*time.Millisecond),
|
||||
wait.WithBreak(50*time.Millisecond),
|
||||
wait.WithDeadline(5*time.Second),
|
||||
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
|
||||
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
|
||||
logger.Log.Panicf("Basenine is not available!")
|
||||
}
|
||||
|
||||
// Limit the database size to default 200MB
|
||||
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while limiting database size: %v", err)
|
||||
}
|
||||
|
||||
// Define the macros
|
||||
for _, extension := range Extensions {
|
||||
macros := extension.Dissector.Macros()
|
||||
for macro, expanded := range macros {
|
||||
err = basenine.Macro(host, port, macro, expanded)
|
||||
if err != nil {
|
||||
logger.Log.Panicf("Error while adding a macro: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
||||
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
|
||||
if syncEntriesConfigJson == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var syncEntriesConfig = &shared.SyncEntriesConfig{}
|
||||
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
|
||||
}
|
||||
|
||||
return syncEntriesConfig
|
||||
}
|
||||
|
||||
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
|
||||
for message := range inChannel {
|
||||
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
|
||||
continue
|
||||
}
|
||||
|
||||
outChannel <- message
|
||||
}
|
||||
}
|
||||
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
servicemap.GetInstance().SetConfig(config.Config)
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
|
||||
func disableRootStaticCache() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if c.Request.RequestURI == "/" {
|
||||
// Disable cache only for the main static route
|
||||
c.Writer.Header().Set("Cache-Control", "no-store")
|
||||
}
|
||||
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func setUIFlags(uiIndexPath string) error {
|
||||
read, err := ioutil.ReadFile(uiIndexPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
|
||||
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
|
||||
|
||||
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -7,17 +7,10 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
ConfigPostTapConfigHandler = controllers.PostTapConfig
|
||||
ConfigGetTapConfigHandler = controllers.GetTapConfig
|
||||
)
|
||||
|
||||
func ConfigRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func ConfigRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/config")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
routeGroup.POST("/tap", middlewares.RequiresAdmin(), func(c *gin.Context) { ConfigPostTapConfigHandler(c) })
|
||||
routeGroup.GET("/tap", func(c *gin.Context) { ConfigGetTapConfigHandler(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.POST("/tap", middlewares.RequiresAdmin(), controllers.PostTapConfig)
|
||||
routeGroup.GET("/tap", controllers.GetTapConfig)
|
||||
}
|
||||
|
||||
@@ -7,18 +7,11 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
EntriesGetHandler = controllers.GetEntries
|
||||
EntriesGetSingleHandler = controllers.GetEntry
|
||||
)
|
||||
|
||||
// EntriesRoutes defines the group of har entries routes.
|
||||
func EntriesRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func EntriesRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/entries")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
routeGroup.GET("/", func(c *gin.Context) { EntriesGetHandler(c) }) // get entries (base/thin entries) and metadata
|
||||
routeGroup.GET("/:id", func(c *gin.Context) { EntriesGetSingleHandler(c) }) // get single (full) entry
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/", controllers.GetEntries) // get entries (base/thin entries) and metadata
|
||||
routeGroup.GET("/:id", controllers.GetEntry) // get single (full) entry
|
||||
}
|
||||
|
||||
@@ -6,16 +6,9 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
InstallGetIsNeededHandler = controllers.IsSetupNecessary
|
||||
InstallPostAdminHandler = controllers.SetupAdminUser
|
||||
)
|
||||
|
||||
func InstallRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func InstallRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/install")
|
||||
|
||||
routeGroup.GET("/isNeeded", func(c *gin.Context) { InstallGetIsNeededHandler(c) })
|
||||
routeGroup.POST("/admin", func(c *gin.Context) { InstallPostAdminHandler(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/isNeeded", controllers.IsSetupNecessary)
|
||||
routeGroup.POST("/admin", controllers.SetupAdminUser)
|
||||
}
|
||||
|
||||
@@ -6,15 +6,9 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
MetadataGetVersionHandler = controllers.GetVersion
|
||||
)
|
||||
|
||||
// MetadataRoutes defines the group of metadata routes.
|
||||
func MetadataRoutes(app *gin.Engine) *gin.RouterGroup {
|
||||
func MetadataRoutes(app *gin.Engine) {
|
||||
routeGroup := app.Group("/metadata")
|
||||
|
||||
routeGroup.GET("/version", func(c *gin.Context) { MetadataGetVersionHandler(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/version", controllers.GetVersion)
|
||||
}
|
||||
|
||||
@@ -7,20 +7,12 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
OASGetServersHandler = controllers.GetOASServers
|
||||
OASGetAllSpecsHandler = controllers.GetOASAllSpecs
|
||||
OASGetSingleSpecHandler = controllers.GetOASSpec
|
||||
)
|
||||
|
||||
// OASRoutes methods to access OAS spec
|
||||
func OASRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func OASRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/oas")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
routeGroup.GET("/", func(c *gin.Context) { OASGetServersHandler(c) }) // list of servers in OAS map
|
||||
routeGroup.GET("/all", func(c *gin.Context) { OASGetAllSpecsHandler(c) }) // list of servers in OAS map
|
||||
routeGroup.GET("/:id", func(c *gin.Context) { OASGetSingleSpecHandler(c) }) // get OAS spec for given server
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/", controllers.GetOASServers) // list of servers in OAS map
|
||||
routeGroup.GET("/all", controllers.GetOASAllSpecs) // list of servers in OAS map
|
||||
routeGroup.GET("/:id", controllers.GetOASSpec) // get OAS spec for given server
|
||||
}
|
||||
|
||||
@@ -7,15 +7,9 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
QueryPostValidateHandler = controllers.PostValidate
|
||||
)
|
||||
|
||||
func QueryRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func QueryRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/query")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
routeGroup.POST("/validate", func(c *gin.Context) { QueryPostValidateHandler(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.POST("/validate", controllers.PostValidate)
|
||||
}
|
||||
|
||||
@@ -7,25 +7,13 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
ServiceMapGetStatus gin.HandlerFunc
|
||||
ServiceMapGet gin.HandlerFunc
|
||||
ServiceMapReset gin.HandlerFunc
|
||||
)
|
||||
|
||||
func ServiceMapRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func ServiceMapRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/servicemap")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
controller := controllers.NewServiceMapController()
|
||||
|
||||
ServiceMapGetStatus = controller.Status
|
||||
ServiceMapGet = controller.Get
|
||||
ServiceMapReset = controller.Reset
|
||||
|
||||
routeGroup.GET("/status", func(c *gin.Context) { ServiceMapGetStatus(c) })
|
||||
routeGroup.GET("/get", func(c *gin.Context) { ServiceMapGet(c) })
|
||||
routeGroup.GET("/reset", func(c *gin.Context) { ServiceMapReset(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/status", controller.Status)
|
||||
routeGroup.GET("/get", controller.Get)
|
||||
routeGroup.GET("/reset", controller.Reset)
|
||||
}
|
||||
|
||||
@@ -7,39 +7,24 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
StatusGetHealthCheck = controllers.HealthCheck
|
||||
StatusPostTappedPods = controllers.PostTappedPods
|
||||
StatusPostTapperStatus = controllers.PostTapperStatus
|
||||
StatusGetConnectedTappersCount = controllers.GetConnectedTappersCount
|
||||
StatusGetTappingStatus = controllers.GetTappingStatus
|
||||
StatusGetAuthStatus = controllers.GetAuthStatus
|
||||
StatusGetAnalyzeInformation = controllers.AnalyzeInformation
|
||||
StatusGetGeneralStats = controllers.GetGeneralStats
|
||||
StatusGetRecentTLSLinks = controllers.GetRecentTLSLinks
|
||||
StatusGetCurrentResolvingInformation = controllers.GetCurrentResolvingInformation
|
||||
)
|
||||
|
||||
func StatusRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func StatusRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/status")
|
||||
routeGroup.Use(middlewares.RequiresAuth())
|
||||
|
||||
routeGroup.GET("/health", func(c *gin.Context) { StatusGetHealthCheck(c) })
|
||||
routeGroup.GET("/health", controllers.HealthCheck)
|
||||
|
||||
routeGroup.POST("/tappedPods", func(c *gin.Context) { StatusPostTappedPods(c) })
|
||||
routeGroup.POST("/tapperStatus", func(c *gin.Context) { StatusPostTapperStatus(c) })
|
||||
routeGroup.GET("/connectedTappersCount", func(c *gin.Context) { StatusGetConnectedTappersCount(c) })
|
||||
routeGroup.GET("/tap", func(c *gin.Context) { StatusGetTappingStatus(c) })
|
||||
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
|
||||
routeGroup.POST("/tapperStatus", controllers.PostTapperStatus)
|
||||
routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount)
|
||||
routeGroup.GET("/tap", controllers.GetTappingStatus)
|
||||
|
||||
routeGroup.GET("/auth", func(c *gin.Context) { StatusGetAuthStatus(c) })
|
||||
routeGroup.GET("/auth", controllers.GetAuthStatus)
|
||||
|
||||
routeGroup.GET("/analyze", func(c *gin.Context) { StatusGetAnalyzeInformation(c) })
|
||||
routeGroup.GET("/analyze", controllers.AnalyzeInformation)
|
||||
|
||||
routeGroup.GET("/general", func(c *gin.Context) { StatusGetGeneralStats(c) }) // get general stats about entries in DB
|
||||
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
|
||||
|
||||
routeGroup.GET("/recentTLSLinks", func(c *gin.Context) { StatusGetRecentTLSLinks(c) })
|
||||
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
|
||||
|
||||
routeGroup.GET("/resolving", func(c *gin.Context) { StatusGetCurrentResolvingInformation(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
||||
}
|
||||
|
||||
@@ -6,18 +6,10 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var (
|
||||
UserPostLogin = controllers.Login
|
||||
UserPostLogout = controllers.Logout
|
||||
UserPostRegister = controllers.Register
|
||||
)
|
||||
|
||||
func UserRoutes(ginApp *gin.Engine) *gin.RouterGroup {
|
||||
func UserRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/user")
|
||||
|
||||
routeGroup.POST("/login", func(c *gin.Context) { UserPostLogin(c) })
|
||||
routeGroup.POST("/logout", func(c *gin.Context) { UserPostLogout(c) })
|
||||
routeGroup.POST("/register", func(c *gin.Context) { UserPostRegister(c) })
|
||||
|
||||
return routeGroup
|
||||
routeGroup.POST("/login", controllers.Login)
|
||||
routeGroup.POST("/logout", controllers.Logout)
|
||||
routeGroup.POST("/register", controllers.Register)
|
||||
}
|
||||
|
||||
@@ -58,8 +58,10 @@ type TcpID struct {
|
||||
}
|
||||
|
||||
type CounterPair struct {
|
||||
StreamId int64
|
||||
Request uint
|
||||
Response uint
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type GenericMessage struct {
|
||||
|
||||
@@ -362,7 +362,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
|
||||
for name, value := range properties["headers"].(map[string]interface{}) {
|
||||
headers = append(headers, api.TableData{
|
||||
Name: name,
|
||||
Value: value.(string),
|
||||
Value: value,
|
||||
Selector: fmt.Sprintf(`request.properties.headers["%s"]`, name),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -115,7 +115,10 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
counterPair.Lock()
|
||||
counterPair.Request++
|
||||
requestCounter := counterPair.Request
|
||||
counterPair.Unlock()
|
||||
|
||||
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||
if strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") && strings.ToLower(req.Header.Get("Upgrade")) == "h2c" {
|
||||
@@ -127,12 +130,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
requestCounter,
|
||||
"HTTP1",
|
||||
)
|
||||
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor)
|
||||
@@ -155,7 +159,10 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
counterPair.Lock()
|
||||
counterPair.Response++
|
||||
responseCounter := counterPair.Response
|
||||
counterPair.Unlock()
|
||||
|
||||
// Check HTTP2 upgrade - HTTP2 Over Cleartext (H2C)
|
||||
if res.StatusCode == 101 && strings.Contains(strings.ToLower(res.Header.Get("Connection")), "upgrade") && strings.ToLower(res.Header.Get("Upgrade")) == "h2c" {
|
||||
@@ -167,12 +174,13 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
|
||||
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d %s",
|
||||
"%d_%s:%s_%s:%s_%d_%s",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
responseCounter,
|
||||
"HTTP1",
|
||||
)
|
||||
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor)
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -23,9 +21,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestHTTPMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
@@ -35,7 +30,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseHTTPMessage := response.(*api.GenericMessage)
|
||||
if responseHTTPMessage.IsRequest {
|
||||
@@ -44,14 +39,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
||||
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage, protoMinor)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestHTTPMessage)
|
||||
matcher.openMessagesMap.Store(ident, &requestHTTPMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseHTTPMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
@@ -61,7 +53,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestHTTPMessage := request.(*api.GenericMessage)
|
||||
if !requestHTTPMessage.IsRequest {
|
||||
@@ -70,7 +62,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage, protoMinor)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseHTTPMessage)
|
||||
matcher.openMessagesMap.Store(ident, &responseHTTPMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -89,13 +81,3 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.Gener
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s%s", split[0], split[2], split[1], split[3], split[4], split[5])
|
||||
return key
|
||||
}
|
||||
|
||||
@@ -368,24 +368,26 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
|
||||
}
|
||||
recordsResults := recordsPath.Get(obj)
|
||||
if len(recordsResults) > 0 {
|
||||
records := recordsResults[0].([]interface{})
|
||||
for i, _record := range records {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
delete(record, "value")
|
||||
if recordsResults[0] != nil {
|
||||
records := recordsResults[0].([]interface{})
|
||||
for i, _record := range records {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
delete(record, "value")
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Record [%d] Details (topic: %s)", i, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d]`, i), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Record [%d] Value", i),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Record [%d] Value", i),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`request.payload.topicData.partitions.partitionData.records.recordBatch.record[%d].value`, i),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -614,22 +616,24 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
|
||||
Data: representMapAsTable(recordBatch, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch`, i, j), []string{"record"}),
|
||||
})
|
||||
|
||||
for k, _record := range recordBatch["record"].([]interface{}) {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
if recordBatch["record"] != nil {
|
||||
for k, _record := range recordBatch["record"].([]interface{}) {
|
||||
record := _record.(map[string]interface{})
|
||||
value := record["value"]
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.TABLE,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] (topic: %s)", i, j, k, topicName),
|
||||
Data: representMapAsTable(record, fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d]`, i, j, k), []string{"value"}),
|
||||
})
|
||||
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
||||
})
|
||||
rep = append(rep, api.SectionData{
|
||||
Type: api.BODY,
|
||||
Title: fmt.Sprintf("Response [%d] Partition Response [%d] Record [%d] Value (topic: %s)", i, j, k, topicName),
|
||||
Data: value.(string),
|
||||
Selector: fmt.Sprintf(`response.payload.responses[%d].partitionResponses[%d].recordSet.recordBatch.record[%d].value`, i, j, k),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -730,6 +734,9 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if payload["topics"] == nil {
|
||||
return rep
|
||||
}
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
@@ -766,6 +773,9 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
|
||||
Data: string(repPayload),
|
||||
})
|
||||
|
||||
if payload["topics"] == nil {
|
||||
return rep
|
||||
}
|
||||
for i, _topic := range payload["topics"].([]interface{}) {
|
||||
topic := _topic.(map[string]interface{})
|
||||
|
||||
|
||||
@@ -47,13 +47,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
}
|
||||
|
||||
if isClient {
|
||||
_, _, err := ReadRequest(b, tcpID, superTimer)
|
||||
_, _, err := ReadRequest(b, tcpID, counterPair, superTimer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
superIdentifier.Protocol = &_protocol
|
||||
} else {
|
||||
err := ReadResponse(b, tcpID, superTimer, emitter)
|
||||
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -120,7 +120,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case CreateTopics:
|
||||
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
@@ -128,6 +132,9 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if reqDetails["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := reqDetails["topicNames"].([]string)
|
||||
for _, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
|
||||
@@ -19,7 +19,7 @@ type Request struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -214,7 +214,8 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%s:%s->%s:%s::%d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstIP,
|
||||
|
||||
@@ -16,7 +16,7 @@ type Response struct {
|
||||
CaptureTime time.Time `json:"captureTime"`
|
||||
}
|
||||
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
|
||||
d := &decoder{reader: r, remain: 4}
|
||||
size := d.readInt32()
|
||||
|
||||
@@ -44,7 +44,8 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
|
||||
}
|
||||
|
||||
key := fmt.Sprintf(
|
||||
"%s:%s->%s:%s::%d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcIP,
|
||||
|
||||
@@ -7,15 +7,21 @@ import (
|
||||
)
|
||||
|
||||
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Request++
|
||||
requestCounter := counterPair.Request
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcPort,
|
||||
tcpID.DstPort,
|
||||
counterPair.Request,
|
||||
requestCounter,
|
||||
)
|
||||
|
||||
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
@@ -31,15 +37,21 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
|
||||
}
|
||||
|
||||
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket) error {
|
||||
counterPair.Lock()
|
||||
counterPair.Response++
|
||||
responseCounter := counterPair.Response
|
||||
counterPair.Unlock()
|
||||
|
||||
ident := fmt.Sprintf(
|
||||
"%s->%s %s->%s %d",
|
||||
"%d_%s:%s_%s:%s_%d",
|
||||
counterPair.StreamId,
|
||||
tcpID.DstIP,
|
||||
tcpID.SrcIP,
|
||||
tcpID.DstPort,
|
||||
tcpID.SrcPort,
|
||||
counterPair.Response,
|
||||
responseCounter,
|
||||
)
|
||||
|
||||
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
|
||||
if item != nil {
|
||||
item.ConnectionInfo = &api.ConnectionInfo{
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -11,7 +9,7 @@ import (
|
||||
|
||||
var reqResMatcher = createResponseRequestMatcher() // global
|
||||
|
||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}_{incremental_counter}
|
||||
// Key is `{stream_id}_{src_ip}:{dst_ip}_{src_ip}:{src_port}_{incremental_counter}`
|
||||
type requestResponseMatcher struct {
|
||||
openMessagesMap *sync.Map
|
||||
}
|
||||
@@ -22,9 +20,6 @@ func createResponseRequestMatcher() requestResponseMatcher {
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
requestRedisMessage := api.GenericMessage{
|
||||
IsRequest: true,
|
||||
CaptureTime: captureTime,
|
||||
@@ -37,7 +32,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
|
||||
},
|
||||
}
|
||||
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if response, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
responseRedisMessage := response.(*api.GenericMessage)
|
||||
if responseRedisMessage.IsRequest {
|
||||
@@ -46,14 +41,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
|
||||
return matcher.preparePair(&requestRedisMessage, responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &requestRedisMessage)
|
||||
matcher.openMessagesMap.Store(ident, &requestRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem {
|
||||
split := splitIdent(ident)
|
||||
key := genKey(split)
|
||||
|
||||
responseRedisMessage := api.GenericMessage{
|
||||
IsRequest: false,
|
||||
CaptureTime: captureTime,
|
||||
@@ -66,7 +58,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
},
|
||||
}
|
||||
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(key); found {
|
||||
if request, found := matcher.openMessagesMap.LoadAndDelete(ident); found {
|
||||
// Type assertion always succeeds because all of the map's values are of api.GenericMessage type
|
||||
requestRedisMessage := request.(*api.GenericMessage)
|
||||
if !requestRedisMessage.IsRequest {
|
||||
@@ -75,7 +67,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
|
||||
return matcher.preparePair(requestRedisMessage, &responseRedisMessage)
|
||||
}
|
||||
|
||||
matcher.openMessagesMap.Store(key, &responseRedisMessage)
|
||||
matcher.openMessagesMap.Store(ident, &responseRedisMessage)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -90,13 +82,3 @@ func (matcher *requestResponseMatcher) preparePair(requestRedisMessage *api.Gene
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func splitIdent(ident string) []string {
|
||||
ident = strings.Replace(ident, "->", " ", -1)
|
||||
return strings.Split(ident, " ")
|
||||
}
|
||||
|
||||
func genKey(split []string) string {
|
||||
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
|
||||
return key
|
||||
}
|
||||
|
||||
@@ -82,6 +82,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
||||
stream.id = factory.streamsMap.nextId()
|
||||
for i, extension := range extensions {
|
||||
counterPair := &api.CounterPair{
|
||||
StreamId: stream.id,
|
||||
Request: 0,
|
||||
Response: 0,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user