mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-16 02:50:03 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8c187179b0 | ||
|
|
d7d802830f |
@@ -69,72 +69,15 @@ func GetGeneralStats() GeneralStats {
|
||||
return generalStats
|
||||
}
|
||||
|
||||
func getBucketStatsCopy() BucketStats {
|
||||
bucketStatsCopy := BucketStats{}
|
||||
bucketStatsLocker.Lock()
|
||||
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
||||
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
||||
return nil
|
||||
}
|
||||
bucketStatsLocker.Unlock()
|
||||
return bucketStatsCopy
|
||||
}
|
||||
|
||||
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
||||
bucketStatsCopy := getBucketStatsCopy()
|
||||
if bucketStatsCopy == nil {
|
||||
if len(bucketStatsCopy) == 0 {
|
||||
return make([]*AccumulativeStatsProtocol, 0)
|
||||
}
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
for _, countersOfTimeFrame := range bucketStatsCopy {
|
||||
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = value.Color
|
||||
}
|
||||
|
||||
for method, countersValue := range value.MethodsStats {
|
||||
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
||||
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
||||
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
||||
Name: method,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
||||
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated, protocolToColor := getAggregatedStatsAllTime(bucketStatsCopy)
|
||||
|
||||
return ConvertToPieData(methodsPerProtocolAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func ConvertToPieData(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName, value := range methodsPerProtocolAggregated {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range value {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
return protocolsData
|
||||
return convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime {
|
||||
@@ -143,79 +86,11 @@ func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*Accumu
|
||||
return make([]*AccumulativeStatsProtocolTime, 0)
|
||||
}
|
||||
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolPerTimeAggregated := make(map[time.Time]map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
firstBucketTime := getFirstBucketTime(time.Now().UTC(), intervalSeconds, numberOfBars)
|
||||
|
||||
// TODO: Extract to function and add tests for those values
|
||||
lastBucketTime := time.Now().UTC().Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
||||
bucketStatsIndex := len(bucketStatsCopy) - 1
|
||||
methodsPerProtocolPerTimeAggregated, protocolToColor := getAggregatedResultTimingFromSpecificTime(intervalSeconds, bucketStatsCopy, firstBucketTime)
|
||||
|
||||
for bucketStatsIndex >= 0 {
|
||||
currentBucketTime := bucketStatsCopy[bucketStatsIndex].BucketTime
|
||||
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
|
||||
for protocolName, data := range bucketStatsCopy[bucketStatsIndex].ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = data.Color
|
||||
}
|
||||
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
bucketStatsIndex--
|
||||
}
|
||||
|
||||
return ConvertToTimelineData(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func ConvertToTimelineData(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
||||
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
||||
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName := range item {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
||||
Time: timeKey.UnixMilli(),
|
||||
ProtocolsData: protocolsData,
|
||||
})
|
||||
}
|
||||
return finalResult
|
||||
return convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
||||
}
|
||||
|
||||
func EntryAdded(size int, summery *api.BaseEntry) {
|
||||
@@ -233,14 +108,8 @@ func EntryAdded(size int, summery *api.BaseEntry) {
|
||||
generalStats.LastEntryTimestamp = currentTimestamp
|
||||
}
|
||||
|
||||
//GetBucketOfTimeStamp Round the entry to the nearest threshold (one minute) floored (e.g: 15:31:45 -> 15:31:00)
|
||||
func GetBucketOfTimeStamp(timestamp int64) time.Time {
|
||||
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
||||
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
}
|
||||
|
||||
func addToBucketStats(size int, summery *api.BaseEntry) {
|
||||
entryTimeBucketRounded := GetBucketOfTimeStamp(summery.Timestamp)
|
||||
entryTimeBucketRounded := getBucketFromTimeStamp(summery.Timestamp)
|
||||
|
||||
if len(bucketsStats) == 0 {
|
||||
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
|
||||
@@ -272,3 +141,148 @@ func addToBucketStats(size int, summery *api.BaseEntry) {
|
||||
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].EntriesCount += 1
|
||||
bucketOfEntry.ProtocolStats[summery.Protocol.Abbreviation].MethodsStats[summery.Method].VolumeInBytes += size
|
||||
}
|
||||
|
||||
func getBucketFromTimeStamp(timestamp int64) time.Time {
|
||||
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
||||
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
||||
}
|
||||
|
||||
func getFirstBucketTime(endTime time.Time, intervalSeconds int, numberOfBars int) time.Time {
|
||||
lastBucketTime := endTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
||||
return firstBucketTime
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsTimelineDictToArray(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
||||
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
||||
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName := range item {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
||||
Time: timeKey.UnixMilli(),
|
||||
ProtocolsData: protocolsData,
|
||||
})
|
||||
}
|
||||
return finalResult
|
||||
}
|
||||
|
||||
func convertAccumulativeStatsDictToArray(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
||||
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
||||
for protocolName, value := range methodsPerProtocolAggregated {
|
||||
entriesCount := 0
|
||||
volumeSizeBytes := 0
|
||||
methods := make([]*AccumulativeStatsCounter, 0)
|
||||
for _, methodAccData := range value {
|
||||
entriesCount += methodAccData.EntriesCount
|
||||
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
||||
methods = append(methods, methodAccData)
|
||||
}
|
||||
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
||||
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
||||
Name: protocolName,
|
||||
EntriesCount: entriesCount,
|
||||
VolumeSizeBytes: volumeSizeBytes,
|
||||
},
|
||||
Color: protocolToColor[protocolName],
|
||||
Methods: methods,
|
||||
})
|
||||
}
|
||||
return protocolsData
|
||||
}
|
||||
|
||||
func getBucketStatsCopy() BucketStats {
|
||||
bucketStatsCopy := BucketStats{}
|
||||
bucketStatsLocker.Lock()
|
||||
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
||||
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
||||
return nil
|
||||
}
|
||||
bucketStatsLocker.Unlock()
|
||||
return bucketStatsCopy
|
||||
}
|
||||
|
||||
func getAggregatedResultTimingFromSpecificTime(intervalSeconds int, bucketStats BucketStats, firstBucketTime time.Time) (map[time.Time]map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := map[string]string{}
|
||||
methodsPerProtocolPerTimeAggregated := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{}
|
||||
|
||||
bucketStatsIndex := len(bucketStats) - 1
|
||||
for bucketStatsIndex >= 0 {
|
||||
currentBucketTime := bucketStats[bucketStatsIndex].BucketTime
|
||||
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
||||
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
||||
|
||||
for protocolName, data := range bucketStats[bucketStatsIndex].ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = data.Color
|
||||
}
|
||||
|
||||
for methodName, dataOfMethod := range data.MethodsStats {
|
||||
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
||||
Name: methodName,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
||||
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
bucketStatsIndex--
|
||||
}
|
||||
return methodsPerProtocolPerTimeAggregated, protocolToColor
|
||||
}
|
||||
|
||||
func getAggregatedStatsAllTime(bucketStatsCopy BucketStats) (map[string]map[string]*AccumulativeStatsCounter, map[string]string) {
|
||||
protocolToColor := make(map[string]string, 0)
|
||||
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
||||
for _, countersOfTimeFrame := range bucketStatsCopy {
|
||||
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
||||
if _, ok := protocolToColor[protocolName]; !ok {
|
||||
protocolToColor[protocolName] = value.Color
|
||||
}
|
||||
|
||||
for method, countersValue := range value.MethodsStats {
|
||||
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
||||
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
||||
}
|
||||
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
||||
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
||||
Name: method,
|
||||
EntriesCount: 0,
|
||||
VolumeSizeBytes: 0,
|
||||
}
|
||||
}
|
||||
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
||||
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
||||
}
|
||||
}
|
||||
}
|
||||
return methodsPerProtocolAggregated, protocolToColor
|
||||
}
|
||||
|
||||
331
agent/pkg/providers/stats_provider_internal_test.go
Normal file
331
agent/pkg/providers/stats_provider_internal_test.go
Normal file
@@ -0,0 +1,331 @@
|
||||
package providers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestGetBucketOfTimeStamp(t *testing.T) {
|
||||
tests := map[int64]time.Time{
|
||||
time.Date(2022, time.Month(1), 1, 10, 34, 45, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
|
||||
time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
|
||||
time.Date(2022, time.Month(1), 1, 10, 59, 01, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 59, 00, 0, time.Local),
|
||||
}
|
||||
|
||||
for key, value := range tests {
|
||||
t.Run(fmt.Sprintf("%v", key), func(t *testing.T) {
|
||||
|
||||
actual := getBucketFromTimeStamp(key)
|
||||
|
||||
if actual != value {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", value, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type DataForBucketBorderFunction struct {
|
||||
EndTime time.Time
|
||||
IntervalInSeconds int
|
||||
NumberOfBars int
|
||||
}
|
||||
|
||||
func TestGetBucketBorders(t *testing.T) {
|
||||
tests := map[DataForBucketBorderFunction]time.Time{
|
||||
DataForBucketBorderFunction{
|
||||
time.Date(2022, time.Month(1), 1, 10, 34, 45, 0, time.UTC),
|
||||
300,
|
||||
10,
|
||||
}: time.Date(2022, time.Month(1), 1, 9, 45, 0, 0, time.UTC),
|
||||
DataForBucketBorderFunction{
|
||||
time.Date(2022, time.Month(1), 1, 10, 35, 45, 0, time.UTC),
|
||||
60,
|
||||
5,
|
||||
}: time.Date(2022, time.Month(1), 1, 10, 31, 00, 0, time.UTC),
|
||||
}
|
||||
|
||||
for key, value := range tests {
|
||||
t.Run(fmt.Sprintf("%v", key), func(t *testing.T) {
|
||||
|
||||
actual := getFirstBucketTime(key.EndTime, key.IntervalInSeconds, key.NumberOfBars)
|
||||
|
||||
if actual != value {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", value, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAggregatedStatsAllTime(t *testing.T) {
|
||||
bucketStatsForTest := BucketStats{
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
"post": {
|
||||
EntriesCount: 2,
|
||||
VolumeInBytes: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"listTopics": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 01, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
"post": {
|
||||
EntriesCount: 2,
|
||||
VolumeInBytes: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"set": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expected := map[string]map[string]*AccumulativeStatsCounter{
|
||||
"http": {
|
||||
"post": {
|
||||
Name: "post",
|
||||
EntriesCount: 4,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
"get": {
|
||||
Name: "get",
|
||||
EntriesCount: 2,
|
||||
VolumeSizeBytes: 4,
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
"listTopics": {
|
||||
Name: "listTopics",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
"set": {
|
||||
Name: "set",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
}
|
||||
actual, _ := getAggregatedStatsAllTime(bucketStatsForTest)
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", 3, len(actual))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAggregatedStatsFromSpecificTime(t *testing.T) {
|
||||
bucketStatsForTest := BucketStats{
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"listTopics": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 01, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
"post": {
|
||||
EntriesCount: 2,
|
||||
VolumeInBytes: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"set": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expected := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{
|
||||
time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC): {
|
||||
"http": {
|
||||
"post": {
|
||||
Name: "post",
|
||||
EntriesCount: 2,
|
||||
VolumeSizeBytes: 3,
|
||||
},
|
||||
"get": {
|
||||
Name: "get",
|
||||
EntriesCount: 2,
|
||||
VolumeSizeBytes: 4,
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
"listTopics": {
|
||||
Name: "listTopics",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
"set": {
|
||||
Name: "set",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
actual, _ := getAggregatedResultTimingFromSpecificTime(300, bucketStatsForTest, time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC))
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", 3, len(actual))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAggregatedStatsFromSpecificTimeMultipleBuckets(t *testing.T) {
|
||||
bucketStatsForTest := BucketStats{
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"listTopics": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&TimeFrameStatsValue{
|
||||
BucketTime: time.Date(2022, time.Month(1), 1, 10, 01, 00, 0, time.UTC),
|
||||
ProtocolStats: map[string]ProtocolStats{
|
||||
"http": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"get": {
|
||||
EntriesCount: 1,
|
||||
VolumeInBytes: 2,
|
||||
},
|
||||
"post": {
|
||||
EntriesCount: 2,
|
||||
VolumeInBytes: 3,
|
||||
},
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
MethodsStats: map[string]*SizeAndEntriesCount{
|
||||
"set": {
|
||||
EntriesCount: 5,
|
||||
VolumeInBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expected := map[time.Time]map[string]map[string]*AccumulativeStatsCounter{
|
||||
time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC): {
|
||||
"http": {
|
||||
"get": {
|
||||
Name: "get",
|
||||
EntriesCount: 1,
|
||||
VolumeSizeBytes: 2,
|
||||
},
|
||||
},
|
||||
"kafka": {
|
||||
"listTopics": {
|
||||
Name: "listTopics",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
time.Date(2022, time.Month(1), 1, 10, 01, 00, 0, time.UTC): {
|
||||
"http": {
|
||||
"post": {
|
||||
Name: "post",
|
||||
EntriesCount: 2,
|
||||
VolumeSizeBytes: 3,
|
||||
},
|
||||
"get": {
|
||||
Name: "get",
|
||||
EntriesCount: 1,
|
||||
VolumeSizeBytes: 2,
|
||||
},
|
||||
},
|
||||
"redis": {
|
||||
"set": {
|
||||
Name: "set",
|
||||
EntriesCount: 5,
|
||||
VolumeSizeBytes: 6,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
actual, _ := getAggregatedResultTimingFromSpecificTime(60, bucketStatsForTest, time.Date(2022, time.Month(1), 1, 10, 00, 00, 0, time.UTC))
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", 3, len(actual))
|
||||
}
|
||||
}
|
||||
@@ -81,24 +81,4 @@ func TestEntryAddedVolume(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestGetBucketOfTimeStamp(t *testing.T) {
|
||||
tests := map[int64]time.Time{
|
||||
time.Date(2022, time.Month(1), 1, 10, 34, 45, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
|
||||
time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 34, 00, 0, time.Local),
|
||||
time.Date(2022, time.Month(1), 1, 10, 59, 01, 0, time.Local).UnixMilli(): time.Date(2022, time.Month(1), 1, 10, 59, 00, 0, time.Local),
|
||||
}
|
||||
|
||||
for key, value := range tests {
|
||||
t.Run(fmt.Sprintf("%v", key), func(t *testing.T) {
|
||||
|
||||
actual := providers.GetBucketOfTimeStamp(key)
|
||||
|
||||
if actual != value {
|
||||
t.Errorf("unexpected result - expected: %v, actual: %v", value, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"test:lint": "eslint .",
|
||||
"predeploy": "cd example && npm install && npm run build",
|
||||
"deploy": "gh-pages -d example/build",
|
||||
"eslint": "eslint . --ext .js,.jsx,.ts,.tsx"
|
||||
"eslint": "eslint . --ext .js,.jsx,.ts,.tsx --max-warnings=0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@craco/craco": "^6.4.3",
|
||||
|
||||
@@ -109,7 +109,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
if (scrollTo) {
|
||||
scrollableRef.current.scrollToIndex(data.data.length - 1);
|
||||
}
|
||||
}, [setLoadMoreTop, setIsLoadingTop, entries, setEntries, query, setNoMoreDataTop, leftOffTop, setLeftOffTop, setQueriedTotal, setTruncatedTimestamp, scrollableRef]);
|
||||
}, [setLoadMoreTop, setIsLoadingTop, entries, setEntries, query, setNoMoreDataTop, leftOffTop, setLeftOffTop, setQueriedTotal, setTruncatedTimestamp, scrollableRef, trafficViewerApi]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!isWsConnectionClosed || !loadMoreTop || noMoreDataTop) return;
|
||||
@@ -121,7 +121,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
useEffect(() => {
|
||||
if (!focusedEntryId && entries.length > 0)
|
||||
setFocusedEntryId(entries[0].id);
|
||||
}, [focusedEntryId, entries])
|
||||
}, [focusedEntryId, entries, setFocusedEntryId])
|
||||
|
||||
useEffect(() => {
|
||||
const newEntries = [...entries];
|
||||
@@ -131,7 +131,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
|
||||
setNoMoreDataTop(false);
|
||||
setEntries(newEntries);
|
||||
}
|
||||
}, [entries])
|
||||
}, [entries, setLeftOffTop, setNoMoreDataTop, setEntries])
|
||||
|
||||
if(ws.current && !ws.current.onmessage) {
|
||||
ws.current.onmessage = (e) => {
|
||||
|
||||
@@ -15,7 +15,6 @@ import outgoingIconFailure from "assets/outgoing-traffic-failure.svg"
|
||||
import outgoingIconNeutral from "assets/outgoing-traffic-neutral.svg"
|
||||
import {useRecoilState} from "recoil";
|
||||
import focusedEntryIdAtom from "../../recoil/focusedEntryId";
|
||||
import queryAtom from "../../recoil/query";
|
||||
|
||||
interface TCPInterface {
|
||||
ip: string
|
||||
@@ -66,7 +65,6 @@ enum CaptureTypes {
|
||||
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode, namespace}) => {
|
||||
|
||||
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
|
||||
const [queryState, setQuery] = useRecoilState(queryAtom);
|
||||
const isSelected = focusedEntryId === entry.id;
|
||||
|
||||
const classification = getClassification(entry.status)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import React, { useEffect, useRef, useState } from "react";
|
||||
import React, {useCallback, useEffect, useRef, useState} from "react";
|
||||
import { Filters } from "../Filters/Filters";
|
||||
import { EntriesList } from "../EntriesList/EntriesList";
|
||||
import makeStyles from '@mui/styles/makeStyles';
|
||||
@@ -59,7 +59,6 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
}) => {
|
||||
|
||||
const classes = useLayoutStyles();
|
||||
|
||||
const setEntries = useSetRecoilState(entriesAtom);
|
||||
const setFocusedEntryId = useSetRecoilState(focusedEntryIdAtom);
|
||||
const query = useRecoilValue(queryAtom);
|
||||
@@ -68,40 +67,44 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
const [noMoreDataTop, setNoMoreDataTop] = useState(false);
|
||||
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
|
||||
const [wsReadyState, setWsReadyState] = useState(0);
|
||||
|
||||
const setLeftOffTop = useSetRecoilState(leftOffTopAtom);
|
||||
const scrollableRef = useRef(null);
|
||||
|
||||
|
||||
const ws = useRef(null);
|
||||
|
||||
|
||||
const closeWebSocket = useCallback(() => {
|
||||
if (ws?.current?.readyState === WebSocket.OPEN) {
|
||||
ws.current.close();
|
||||
return true;
|
||||
}
|
||||
}, [])
|
||||
|
||||
useEffect(() => {
|
||||
if(shouldCloseWebSocket){
|
||||
closeWebSocket()
|
||||
setShouldCloseWebSocket(false);
|
||||
}
|
||||
}, [shouldCloseWebSocket])
|
||||
}, [shouldCloseWebSocket, setShouldCloseWebSocket, closeWebSocket])
|
||||
|
||||
useEffect(() => {
|
||||
reopenConnection()
|
||||
}, [webSocketUrl])
|
||||
|
||||
const ws = useRef(null);
|
||||
|
||||
const openEmptyWebSocket = () => {
|
||||
openWebSocket(DEFAULT_LEFTOFF, query, true, DEFAULT_FETCH, DEFAULT_FETCH_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
const closeWebSocket = () => {
|
||||
if (ws?.current?.readyState === WebSocket.OPEN) {
|
||||
ws.current.close();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
const sendQueryWhenWsOpen = useCallback((leftOff: string, query: string, fetch: number, fetchTimeoutMs: number) => {
|
||||
setTimeout(() => {
|
||||
if (ws?.current?.readyState === WebSocket.OPEN) {
|
||||
ws.current.send(JSON.stringify({
|
||||
"leftOff": leftOff,
|
||||
"query": query,
|
||||
"enableFullEntries": false,
|
||||
"fetch": fetch,
|
||||
"timeoutMs": fetchTimeoutMs
|
||||
}));
|
||||
} else {
|
||||
sendQueryWhenWsOpen(leftOff, query, fetch, fetchTimeoutMs);
|
||||
}
|
||||
}, 500)
|
||||
}, [])
|
||||
|
||||
const listEntry = useRef(null);
|
||||
const openWebSocket = (leftOff: string, query: string, resetEntries: boolean, fetch: number, fetchTimeoutMs: number) => {
|
||||
const openWebSocket = useCallback((leftOff: string, query: string, resetEntries: boolean, fetch: number, fetchTimeoutMs: number) => {
|
||||
if (resetEntries) {
|
||||
setFocusedEntryId(null);
|
||||
setEntries([]);
|
||||
@@ -127,24 +130,11 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
}
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
|
||||
const sendQueryWhenWsOpen = (leftOff: string, query: string, fetch: number, fetchTimeoutMs: number) => {
|
||||
setTimeout(() => {
|
||||
if (ws?.current?.readyState === WebSocket.OPEN) {
|
||||
ws.current.send(JSON.stringify({
|
||||
"leftOff": leftOff,
|
||||
"query": query,
|
||||
"enableFullEntries": false,
|
||||
"fetch": fetch,
|
||||
"timeoutMs": fetchTimeoutMs
|
||||
}));
|
||||
} else {
|
||||
sendQueryWhenWsOpen(leftOff, query, fetch, fetchTimeoutMs);
|
||||
}
|
||||
}, 500)
|
||||
}
|
||||
}, [setFocusedEntryId, setEntries, setLeftOffTop, setNoMoreDataTop, ws, sendQueryWhenWsOpen, webSocketUrl])
|
||||
|
||||
const openEmptyWebSocket = useCallback(() => {
|
||||
openWebSocket(DEFAULT_LEFTOFF, query, true, DEFAULT_FETCH, DEFAULT_FETCH_TIMEOUT_MS);
|
||||
}, [openWebSocket, query])
|
||||
|
||||
useEffect(() => {
|
||||
setTrafficViewerApiState({...trafficViewerApiProp, webSocket: {close: closeWebSocket}});
|
||||
@@ -156,7 +146,7 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
console.error(error);
|
||||
}
|
||||
})()
|
||||
}, []);
|
||||
}, [trafficViewerApiProp, closeWebSocket, setTappingStatus, setTrafficViewerApiState]);
|
||||
|
||||
const toggleConnection = () => {
|
||||
if (!closeWebSocket()) {
|
||||
@@ -166,12 +156,17 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
|
||||
}
|
||||
}
|
||||
|
||||
const reopenConnection = async () => {
|
||||
const reopenConnection = useCallback(async () => {
|
||||
closeWebSocket()
|
||||
openEmptyWebSocket();
|
||||
scrollableRef.current.jumpToBottom();
|
||||
setIsSnappedToBottom(true);
|
||||
}
|
||||
}, [scrollableRef, setIsSnappedToBottom, closeWebSocket, openEmptyWebSocket])
|
||||
|
||||
useEffect(() => {
|
||||
reopenConnection()
|
||||
// eslint-disable-next-line
|
||||
}, [webSocketUrl])
|
||||
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
|
||||
@@ -11,7 +11,7 @@ interface LinkProps {
|
||||
}
|
||||
|
||||
export const Link: React.FC<LinkProps> = ({ link, className, title, children }) => {
|
||||
return <a href={link} className={className} title={title} target="_blank">
|
||||
return <a href={link} className={className} title={title} target="_blank" rel="noreferrer">
|
||||
{children}
|
||||
</a>
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ const SelectList: React.FC<Props> = ({ items, tableName, checkedValues = [], mul
|
||||
}
|
||||
|
||||
setCheckedValues(newChecked)
|
||||
}, [searchValue, checkedValues, filteredValuesKeys])
|
||||
}, [checkedValues, filteredValuesKeys, items, setCheckedValues])
|
||||
|
||||
const dataFieldFunc = (listValue) => listValue.component ? listValue.component :
|
||||
<span className={styles.nowrap} title={listValue.value}>
|
||||
|
||||
@@ -40,7 +40,7 @@ export const TimelineBarChart: React.FC<TimelineBarChartProps> = ({ timeLineBarC
|
||||
if (!data) return;
|
||||
const protocolsBarsData = [];
|
||||
const prtcNames = [];
|
||||
data.map(protocolObj => {
|
||||
data.forEach(protocolObj => {
|
||||
let obj: { [k: string]: any } = {};
|
||||
obj.timestamp = getHoursAndMinutes(protocolObj.timestamp);
|
||||
protocolObj.protocols.forEach(protocol => {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { atom } from "recoil";
|
||||
import TrafficViewerApi from "../../components/TrafficViewer/TrafficViewerApi";
|
||||
|
||||
const TrafficViewerApiAtom = atom({
|
||||
key: "TrafficViewerApiAtom",
|
||||
|
||||
Reference in New Issue
Block a user