Compare commits

...

2 Commits

Author SHA1 Message Date
M. Mert Yıldıran
5fc3e38c1a Fix the Kafka ApiKey query and add ApiKeyName field (human-readable ApiKey) (#1080)
* Fix the Kafka `ApiKey` query and add `ApiKeyName` field (human-readable `ApiKey`)

* Update the dataset for Kafka unit tests

* #run_acceptance_tests
2022-05-15 09:42:32 +03:00
RoyUP9
09a0fca2c2 Extracted insert to database functionality (#1082) 2022-05-15 09:19:33 +03:00
8 changed files with 90 additions and 28 deletions

View File

@@ -373,6 +373,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
func initializeDependencies() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
dependency.RegisterGenerator(dependency.EntriesInserter, func() interface{} { return api.GetBasenineEntryInserterInstance() })
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })

View File

@@ -25,10 +25,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
basenine "github.com/up9inc/basenine/client/go"
)
var k8sResolver *resolver.Resolver
@@ -103,20 +100,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
panic("Channel of captured messages is nil")
}
BasenineReconnect:
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
}
if err = connection.InsertMode(); err != nil {
logger.Log.Errorf("Insert mode call failed: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
}
disableOASValidation := false
ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx)
@@ -163,11 +146,9 @@ BasenineReconnect:
providers.EntryAdded(len(data))
if err = connection.SendText(string(data)); err != nil {
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter)
if err := entryInserter.Insert(mizuEntry); err != nil {
logger.Log.Errorf("Error inserting entry, err: %v", err)
}
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)

View File

@@ -0,0 +1,71 @@
package api
import (
"encoding/json"
"fmt"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
"sync"
"time"
)
type EntryInserter interface {
Insert(entry *api.Entry) error
}
type BasenineEntryInserter struct {
connection *basenine.Connection
}
var instance *BasenineEntryInserter
var once sync.Once
func GetBasenineEntryInserterInstance() *BasenineEntryInserter {
once.Do(func() {
instance = &BasenineEntryInserter{}
})
return instance
}
func (e *BasenineEntryInserter) Insert(entry *api.Entry) error {
if e.connection == nil {
e.connection = initializeConnection()
}
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("error marshling entry, err: %v", err)
}
if err := e.connection.SendText(string(data)); err != nil {
e.connection.Close()
e.connection = nil
return fmt.Errorf("error sending text to database, err: %v", err)
}
return nil
}
func initializeConnection() *basenine.Connection{
for {
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
time.Sleep(shared.BasenineReconnectInterval * time.Second)
continue
}
if err = connection.InsertMode(); err != nil {
logger.Log.Errorf("Insert mode call failed: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
continue
}
return connection
}
}

View File

@@ -5,6 +5,7 @@ type DependencyContainerType string
const (
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
OasGeneratorDependency = "OasGeneratorDependency"
EntriesInserter = "EntriesInserter"
EntriesProvider = "EntriesProvider"
EntriesSocketStreamer = "EntriesSocketStreamer"
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect

View File

@@ -3,13 +3,14 @@ package kafka
import (
"encoding/json"
"fmt"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"reflect"
"sort"
"strconv"
"strings"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"github.com/fatih/camelcase"
"github.com/ohler55/ojg/jp"
"github.com/ohler55/ojg/oj"
@@ -36,9 +37,14 @@ type KafkaWrapper struct {
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]api.TableData{
{
Name: "ApiKeyName",
Value: data["apiKeyName"].(string),
Selector: `request.apiKeyName`,
},
{
Name: "ApiKey",
Value: apiNames[int(data["apiKey"].(float64))],
Value: int(data["apiKey"].(float64)),
Selector: `request.apiKey`,
},
{

View File

@@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
statusQuery := ""
apiKey := ApiKey(entry.Request["apiKey"].(float64))
method := apiNames[apiKey]
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
method := entry.Request["apiKeyName"].(string)
methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method)
summary := ""
summaryQuery := ""

View File

@@ -11,6 +11,7 @@ import (
type Request struct {
Size int32 `json:"size"`
ApiKeyName string `json:"apiKeyName"`
ApiKey ApiKey `json:"apiKey"`
ApiVersion int16 `json:"apiVersion"`
CorrelationID int32 `json:"correlationID"`
@@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca
request := &Request{
Size: size,
ApiKeyName: apiNames[apiKey],
ApiKey: apiKey,
ApiVersion: apiVersion,
CorrelationID: correlationID,