Compare commits

...

5 Commits

Author SHA1 Message Date
RoyUP9
8ea2dabb34 Added send to socket error validation (#1085) 2022-05-15 14:44:48 +03:00
leon-up9
366d34b8d0 Ui/fix/selectList-sticky-header-fix (#1084)
* selectList sticky header

* selectList changed & servicemap adapted

* ignore eslint

Co-authored-by: Leon <>
Co-authored-by: AmitUp9 <96980485+AmitUp9@users.noreply.github.com>
2022-05-15 14:39:09 +03:00
M. Mert Yıldıran
5fc3e38c1a Fix the Kafka ApiKey query and add ApiKeyName field (human-readable ApiKey) (#1080)
* Fix the Kafka `ApiKey` query and add `ApiKeyName` field (human-readable `ApiKey`)

* Update the dataset for Kafka unit tests

* #run_acceptance_tests
2022-05-15 09:42:32 +03:00
RoyUP9
09a0fca2c2 Extracted insert to database functionality (#1082) 2022-05-15 09:19:33 +03:00
M. Mert Yıldıran
0437586908 Replace the gRPC reference link with a better one (#1081) 2022-05-14 19:43:43 +03:00
15 changed files with 202 additions and 97 deletions

View File

@@ -373,6 +373,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
func initializeDependencies() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
dependency.RegisterGenerator(dependency.EntriesInserter, func() interface{} { return api.GetBasenineEntryInserterInstance() })
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })

View File

@@ -5,20 +5,19 @@ import (
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)
type EntryStreamerSocketConnector interface {
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
SendMetadata(socketId int, metadata *basenine.Metadata)
SendToastError(socketId int, err error)
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error
SendMetadata(socketId int, metadata *basenine.Metadata) error
SendToastError(socketId int, err error) error
CleanupSocket(socketId int)
}
type DefaultEntryStreamerSocketConnector struct{}
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error {
var message []byte
if params.EnableFullEntries {
message, _ = models.CreateFullEntryWebSocketMessage(entry)
@@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
}
if err := SendToSocket(socketId, message); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error {
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
if err := SendToSocket(socketId, metadataBytes); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error {
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
Type: "error",
AutoClose: 5000,
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
})
if err := SendToSocket(socketId, toastBytes); err != nil {
logger.Log.Error(err)
return err
}
return nil
}
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {

View File

@@ -25,10 +25,7 @@ import (
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
basenine "github.com/up9inc/basenine/client/go"
)
var k8sResolver *resolver.Resolver
@@ -103,20 +100,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
panic("Channel of captured messages is nil")
}
BasenineReconnect:
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
}
if err = connection.InsertMode(); err != nil {
logger.Log.Errorf("Insert mode call failed: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
}
disableOASValidation := false
ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx)
@@ -163,11 +146,9 @@ BasenineReconnect:
providers.EntryAdded(len(data))
if err = connection.SendText(string(data)); err != nil {
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
goto BasenineReconnect
entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter)
if err := entryInserter.Insert(mizuEntry); err != nil {
logger.Log.Errorf("Error inserting entry, err: %v", err)
}
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)

View File

@@ -0,0 +1,71 @@
package api
import (
"encoding/json"
"fmt"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap/api"
"sync"
"time"
)
type EntryInserter interface {
Insert(entry *api.Entry) error
}
type BasenineEntryInserter struct {
connection *basenine.Connection
}
var instance *BasenineEntryInserter
var once sync.Once
func GetBasenineEntryInserterInstance() *BasenineEntryInserter {
once.Do(func() {
instance = &BasenineEntryInserter{}
})
return instance
}
func (e *BasenineEntryInserter) Insert(entry *api.Entry) error {
if e.connection == nil {
e.connection = initializeConnection()
}
data, err := json.Marshal(entry)
if err != nil {
return fmt.Errorf("error marshling entry, err: %v", err)
}
if err := e.connection.SendText(string(data)); err != nil {
e.connection.Close()
e.connection = nil
return fmt.Errorf("error sending text to database, err: %v", err)
}
return nil
}
func initializeConnection() *basenine.Connection{
for {
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
time.Sleep(shared.BasenineReconnectInterval * time.Second)
continue
}
if err = connection.InsertMode(); err != nil {
logger.Log.Errorf("Insert mode call failed: %v", err)
connection.Close()
time.Sleep(shared.BasenineReconnectInterval * time.Second)
continue
}
return connection
}
}

View File

@@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
meta := make(chan []byte)
query := params.Query
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
if err != nil {
entryStreamerSocketConnector.SendToastError(socketId, err)
if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
return err
}
entryStreamerSocketConnector.CleanupSocket(socketId)
return err
}
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
if err != nil {
logger.Log.Errorf("Fetch error: %v", err.Error())
logger.Log.Errorf("Fetch error: %v", err)
}
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
@@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
}
var entry *tapApi.Entry
err = json.Unmarshal(bytes, &entry)
if err != nil {
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
if err = json.Unmarshal(bytes, &entry); err != nil {
logger.Log.Debugf("Error unmarshalling entry: %v", err)
continue
}
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
return
}
}
}
@@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
}
var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata)
if err != nil {
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
if err = json.Unmarshal(bytes, &metadata); err != nil {
logger.Log.Debugf("Error unmarshalling metadata: %v", err)
continue
}
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
return
}
}
}
@@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con
}
var firstMetadata *basenine.Metadata
err = json.Unmarshal(firstMeta, &firstMetadata)
if err != nil {
if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
return
}
leftOff = firstMetadata.LeftOff
var lastMetadata *basenine.Metadata
err = json.Unmarshal(lastMeta, &lastMetadata)
if err != nil {
if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
return
}
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
return
}
connector.SendMetadata(socketId, lastMetadata)
data = e.reverseBytesSlice(data)
for _, row := range data {
var entry *tapApi.Entry
err = json.Unmarshal(row, &entry)
if err != nil {
if err = json.Unmarshal(row, &entry); err != nil {
break
}
connector.SendEntry(socketId, entry, params)
if err = connector.SendEntry(socketId, entry, params); err != nil {
return
}
}
return
}

View File

@@ -5,6 +5,7 @@ type DependencyContainerType string
const (
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
OasGeneratorDependency = "OasGeneratorDependency"
EntriesInserter = "EntriesInserter"
EntriesProvider = "EntriesProvider"
EntriesSocketStreamer = "EntriesSocketStreamer"
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"

View File

@@ -66,7 +66,7 @@ var grpcProtocol api.Protocol = api.Protocol{
BackgroundColor: "#244c5a",
ForegroundColor: "#ffffff",
FontSize: 11,
ReferenceLink: "https://grpc.github.io/grpc/core/md_doc_statuscodes.html",
ReferenceLink: "https://grpc.github.io/grpc/core/md_doc__p_r_o_t_o_c_o_l-_h_t_t_p2.html",
Ports: []string{"80", "443", "8080", "50051"},
Priority: 0,
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect8/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect9/kafka/\* expect

View File

@@ -3,13 +3,14 @@ package kafka
import (
"encoding/json"
"fmt"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"reflect"
"sort"
"strconv"
"strings"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"github.com/fatih/camelcase"
"github.com/ohler55/ojg/jp"
"github.com/ohler55/ojg/oj"
@@ -36,9 +37,14 @@ type KafkaWrapper struct {
func representRequestHeader(data map[string]interface{}, rep []interface{}) []interface{} {
requestHeader, _ := json.Marshal([]api.TableData{
{
Name: "ApiKeyName",
Value: data["apiKeyName"].(string),
Selector: `request.apiKeyName`,
},
{
Name: "ApiKey",
Value: apiNames[int(data["apiKey"].(float64))],
Value: int(data["apiKey"].(float64)),
Selector: `request.apiKey`,
},
{

View File

@@ -96,8 +96,8 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
statusQuery := ""
apiKey := ApiKey(entry.Request["apiKey"].(float64))
method := apiNames[apiKey]
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
method := entry.Request["apiKeyName"].(string)
methodQuery := fmt.Sprintf(`request.apiKeyName == "%s"`, method)
summary := ""
summaryQuery := ""

View File

@@ -11,6 +11,7 @@ import (
type Request struct {
Size int32 `json:"size"`
ApiKeyName string `json:"apiKeyName"`
ApiKey ApiKey `json:"apiKey"`
ApiVersion int16 `json:"apiVersion"`
CorrelationID int32 `json:"correlationID"`
@@ -202,6 +203,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, ca
request := &Request{
Size: size,
ApiKeyName: apiNames[apiKey],
ApiKey: apiKey,
ApiVersion: apiVersion,
CorrelationID: correlationID,

View File

@@ -67,7 +67,6 @@
height: 100%
display: flex
flex-direction: column
margin-right: 10px
width: 100%
border-radius: 4px
@@ -82,17 +81,18 @@
margin-top: 10px
margin-right: 10px
.protocolsFilterList, .servicesFilter
.card
background: white
padding: 10px
border-radius: 4px
user-select: none
box-shadow: 0px 1px 5px #979797
.servicesFilter
margin-top: 10px
.servicesFilterWrapper
margin-top: 20px
margin-bottom: 3px
height: 100%
overflow: hidden
border-radius: 4px
& .servicesFilterList
height: calc(100% - 30px - 52px)
.servicesFilterList
height: calc(100% - 30px - 52px)

View File

@@ -158,6 +158,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
if (checkedProtocols.length === 0) {
setCheckedProtocols(getProtocolsForFilter.map(x => x.key))
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [getProtocolsForFilter])
useEffect(() => {
@@ -217,13 +218,13 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onClos
<div className={styles.filterSection + ` ${isFilterClicked ? styles.show : ""}`}>
<Resizeable minWidth={170} maxWidth={320}>
<div className={styles.filterWrapper}>
<div className={styles.protocolsFilterList}>
<div className={styles.card}>
<SelectList items={getProtocolsForFilter} checkBoxWidth="5%" tableName={"PROTOCOLS"} multiSelect={true}
checkedValues={checkedProtocols} setCheckedValues={onProtocolsChange} tableClassName={styles.filters}
inputSearchClass={styles.servicesFilterSearch} isFilterable={false}/>
</div>
<div className={styles.servicesFilter}>
<div className={styles.servicesFilterList}>
<div className={styles.servicesFilterWrapper + ` ${styles.card}`}>
<div className={styles.servicesFilterList}>
<SelectList items={getServicesForFilter} tableName={"SERVICES"} tableClassName={styles.filters} multiSelect={true}
checkBoxWidth="5%" checkedValues={checkedServices} setCheckedValues={onServiceChanges} inputSearchClass={styles.servicesFilterSearch}/>
</div>

View File

@@ -85,7 +85,7 @@ const SelectList: React.FC<Props> = ({ items, tableName, checkedValues = [], mul
const tableBody = filteredValues.length === 0 ?
<tr>
<td colSpan={2}>
<td colSpan={2} className={styles.displayBlock}>
<NoDataMessage messageText={noItemsMessage} />
</td>
</tr>

View File

@@ -1,44 +1,70 @@
@import '../../../variables.module'
@import '../../../components'
.selectListTable
overflow: auto
height: 100%
user-select: none // when resizble moved we get unwanted beheviour
height: 100%
table
width: 100%
margin-top: 20px
border-collapse: collapse
table-layout: fixed
height: 100%
display: flex
flex-flow: column
height: 100%
th
color: $blue-gray
text-align: left
padding: 10px
position: sticky
top: 0
background: $main-background-color
font-size: 12px
thead
display: table
table-layout: fixed
flex: 0 0 auto
width: calc(100% - 0.9em)
tr
border-bottom-width: 1px
border-bottom-color: $data-background-color
border-bottom-style: solid
width: 100%
tbody
display: block
overflow: auto
width: 100%
height: 100%
flex: 1 1 auto
td
color: $light-gray
padding: 10px
font-size: 11px
font-weight: 600
padding-top: 5px
padding-bottom: 5px
tbody tr:hover
background: $header-background-color
th
color: $blue-gray
text-align: left
padding: 10px
background: $main-background-color
font-size: 12px
tr
border-bottom-width: 1px
border-bottom-color: $data-background-color
border-bottom-style: solid
width: 100%
display: block
position: relative
display: table
table-layout: fixed
td
color: $light-gray
padding: 10px
font-size: 11px
font-weight: 600
padding-top: 5px
padding-bottom: 5px
.nowrap
white-space: nowrap
.totalSelected
font-size: 12px
color: $light-blue-color
font-weight: 700
font-size: 12px
color: $light-blue-color
font-weight: 700
.displayBlock
display: block
.filterInput
margin-bottom: 20px