|
|
|
|
@@ -2,6 +2,7 @@ package providers
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"reflect"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/jinzhu/copier"
|
|
|
|
|
@@ -19,18 +20,18 @@ type GeneralStats struct {
|
|
|
|
|
type BucketStats []*TimeFrameStatsValue
|
|
|
|
|
|
|
|
|
|
type TimeFrameStatsValue struct {
|
|
|
|
|
BucketTime time.Time
|
|
|
|
|
ProtocolStats map[string]ProtocolStats
|
|
|
|
|
BucketTime time.Time `json:"timestamp"`
|
|
|
|
|
ProtocolStats map[string]ProtocolStats `json:"protocols"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ProtocolStats struct {
|
|
|
|
|
MethodsStats map[string]*SizeAndEntriesCount
|
|
|
|
|
Color string
|
|
|
|
|
MethodsStats map[string]*SizeAndEntriesCount `json:"methods"`
|
|
|
|
|
Color string `json:"color"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type SizeAndEntriesCount struct {
|
|
|
|
|
EntriesCount int
|
|
|
|
|
VolumeInBytes int
|
|
|
|
|
EntriesCount int `json:"entriesCount"`
|
|
|
|
|
VolumeInBytes int `json:"volumeInBytes"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type AccumulativeStatsCounter struct {
|
|
|
|
|
@@ -45,9 +46,19 @@ type AccumulativeStatsProtocol struct {
|
|
|
|
|
Methods []*AccumulativeStatsCounter `json:"methods"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type AccumulativeStatsProtocolTime struct {
|
|
|
|
|
ProtocolsData []*AccumulativeStatsProtocol `json:"protocols"`
|
|
|
|
|
Time int64 `json:"timestamp"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
generalStats = GeneralStats{}
|
|
|
|
|
bucketsStats = BucketStats{}
|
|
|
|
|
generalStats = GeneralStats{}
|
|
|
|
|
bucketsStats = BucketStats{}
|
|
|
|
|
bucketStatsLocker = sync.Mutex{}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
InternalBucketThreshold = time.Minute * 1
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func ResetGeneralStats() {
|
|
|
|
|
@@ -58,33 +69,34 @@ func GetGeneralStats() GeneralStats {
|
|
|
|
|
return generalStats
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
|
|
|
|
func getBucketStatsCopy() BucketStats {
|
|
|
|
|
bucketStatsCopy := BucketStats{}
|
|
|
|
|
bucketStatsLocker.Lock()
|
|
|
|
|
if err := copier.Copy(&bucketStatsCopy, bucketsStats); err != nil {
|
|
|
|
|
logger.Log.Errorf("Error while copying src stats into temporary copied object")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
bucketStatsLocker.Unlock()
|
|
|
|
|
return bucketStatsCopy
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
|
|
|
|
bucketStatsCopy := getBucketStatsCopy()
|
|
|
|
|
if bucketStatsCopy == nil {
|
|
|
|
|
return make([]*AccumulativeStatsProtocol, 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result := make(map[string]*AccumulativeStatsProtocol, 0)
|
|
|
|
|
protocolToColor := make(map[string]string, 0)
|
|
|
|
|
methodsPerProtocolAggregated := make(map[string]map[string]*AccumulativeStatsCounter, 0)
|
|
|
|
|
for _, countersOfTimeFrame := range bucketStatsCopy {
|
|
|
|
|
for protocolName, value := range countersOfTimeFrame.ProtocolStats {
|
|
|
|
|
|
|
|
|
|
if _, found := result[protocolName]; !found {
|
|
|
|
|
result[protocolName] = &AccumulativeStatsProtocol{
|
|
|
|
|
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
|
|
|
|
Name: protocolName,
|
|
|
|
|
EntriesCount: 0,
|
|
|
|
|
VolumeSizeBytes: 0,
|
|
|
|
|
},
|
|
|
|
|
Color: value.Color,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
|
|
|
|
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
|
|
|
|
if _, ok := protocolToColor[protocolName]; !ok {
|
|
|
|
|
protocolToColor[protocolName] = value.Color
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for method, countersValue := range value.MethodsStats {
|
|
|
|
|
if _, found := methodsPerProtocolAggregated[protocolName]; !found {
|
|
|
|
|
methodsPerProtocolAggregated[protocolName] = map[string]*AccumulativeStatsCounter{}
|
|
|
|
|
}
|
|
|
|
|
if _, found := methodsPerProtocolAggregated[protocolName][method]; !found {
|
|
|
|
|
methodsPerProtocolAggregated[protocolName][method] = &AccumulativeStatsCounter{
|
|
|
|
|
Name: method,
|
|
|
|
|
@@ -92,23 +104,116 @@ func GetAccumulativeStats() []*AccumulativeStatsProtocol {
|
|
|
|
|
VolumeSizeBytes: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result[protocolName].AccumulativeStatsCounter.EntriesCount += countersValue.EntriesCount
|
|
|
|
|
methodsPerProtocolAggregated[protocolName][method].EntriesCount += countersValue.EntriesCount
|
|
|
|
|
result[protocolName].AccumulativeStatsCounter.VolumeSizeBytes += countersValue.VolumeInBytes
|
|
|
|
|
methodsPerProtocolAggregated[protocolName][method].VolumeSizeBytes += countersValue.VolumeInBytes
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
finalResult := make([]*AccumulativeStatsProtocol, 0)
|
|
|
|
|
for _, value := range result {
|
|
|
|
|
methodsForProtocol := make([]*AccumulativeStatsCounter, 0)
|
|
|
|
|
for _, methodValue := range methodsPerProtocolAggregated[value.Name] {
|
|
|
|
|
methodsForProtocol = append(methodsForProtocol, methodValue)
|
|
|
|
|
return ConvertToPieData(methodsPerProtocolAggregated, protocolToColor)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func ConvertToPieData(methodsPerProtocolAggregated map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocol {
|
|
|
|
|
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
|
|
|
|
for protocolName, value := range methodsPerProtocolAggregated {
|
|
|
|
|
entriesCount := 0
|
|
|
|
|
volumeSizeBytes := 0
|
|
|
|
|
methods := make([]*AccumulativeStatsCounter, 0)
|
|
|
|
|
for _, methodAccData := range value {
|
|
|
|
|
entriesCount += methodAccData.EntriesCount
|
|
|
|
|
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
|
|
|
|
methods = append(methods, methodAccData)
|
|
|
|
|
}
|
|
|
|
|
value.Methods = methodsForProtocol
|
|
|
|
|
finalResult = append(finalResult, value)
|
|
|
|
|
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
|
|
|
|
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
|
|
|
|
Name: protocolName,
|
|
|
|
|
EntriesCount: entriesCount,
|
|
|
|
|
VolumeSizeBytes: volumeSizeBytes,
|
|
|
|
|
},
|
|
|
|
|
Color: protocolToColor[protocolName],
|
|
|
|
|
Methods: methods,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return protocolsData
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func GetAccumulativeStatsTiming(intervalSeconds int, numberOfBars int) []*AccumulativeStatsProtocolTime {
|
|
|
|
|
bucketStatsCopy := getBucketStatsCopy()
|
|
|
|
|
if len(bucketStatsCopy) == 0 {
|
|
|
|
|
return make([]*AccumulativeStatsProtocolTime, 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protocolToColor := make(map[string]string, 0)
|
|
|
|
|
methodsPerProtocolPerTimeAggregated := make(map[time.Time]map[string]map[string]*AccumulativeStatsCounter, 0)
|
|
|
|
|
|
|
|
|
|
// TODO: Extract to function and add tests for those values
|
|
|
|
|
lastBucketTime := time.Now().UTC().Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
|
|
|
|
firstBucketTime := lastBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds*(numberOfBars-1)))
|
|
|
|
|
bucketStatsIndex := len(bucketStatsCopy) - 1
|
|
|
|
|
|
|
|
|
|
for bucketStatsIndex >= 0 {
|
|
|
|
|
currentBucketTime := bucketStatsCopy[bucketStatsIndex].BucketTime
|
|
|
|
|
if currentBucketTime.After(firstBucketTime) || currentBucketTime.Equal(firstBucketTime) {
|
|
|
|
|
resultBucketRoundedKey := currentBucketTime.Add(-1 * time.Second * time.Duration(intervalSeconds) / 2).Round(time.Second * time.Duration(intervalSeconds))
|
|
|
|
|
|
|
|
|
|
for protocolName, data := range bucketStatsCopy[bucketStatsIndex].ProtocolStats {
|
|
|
|
|
if _, ok := protocolToColor[protocolName]; !ok {
|
|
|
|
|
protocolToColor[protocolName] = data.Color
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for methodName, dataOfMethod := range data.MethodsStats {
|
|
|
|
|
|
|
|
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey]; !ok {
|
|
|
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey] = map[string]map[string]*AccumulativeStatsCounter{}
|
|
|
|
|
}
|
|
|
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName]; !ok {
|
|
|
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName] = map[string]*AccumulativeStatsCounter{}
|
|
|
|
|
}
|
|
|
|
|
if _, ok := methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName]; !ok {
|
|
|
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName] = &AccumulativeStatsCounter{
|
|
|
|
|
Name: methodName,
|
|
|
|
|
EntriesCount: 0,
|
|
|
|
|
VolumeSizeBytes: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].EntriesCount += dataOfMethod.EntriesCount
|
|
|
|
|
methodsPerProtocolPerTimeAggregated[resultBucketRoundedKey][protocolName][methodName].VolumeSizeBytes += dataOfMethod.VolumeInBytes
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
bucketStatsIndex--
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ConvertToTimelineData(methodsPerProtocolPerTimeAggregated, protocolToColor)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func ConvertToTimelineData(methodsPerProtocolPerTimeAggregated map[time.Time]map[string]map[string]*AccumulativeStatsCounter, protocolToColor map[string]string) []*AccumulativeStatsProtocolTime {
|
|
|
|
|
finalResult := make([]*AccumulativeStatsProtocolTime, 0)
|
|
|
|
|
for timeKey, item := range methodsPerProtocolPerTimeAggregated {
|
|
|
|
|
protocolsData := make([]*AccumulativeStatsProtocol, 0)
|
|
|
|
|
for protocolName := range item {
|
|
|
|
|
entriesCount := 0
|
|
|
|
|
volumeSizeBytes := 0
|
|
|
|
|
methods := make([]*AccumulativeStatsCounter, 0)
|
|
|
|
|
for _, methodAccData := range methodsPerProtocolPerTimeAggregated[timeKey][protocolName] {
|
|
|
|
|
entriesCount += methodAccData.EntriesCount
|
|
|
|
|
volumeSizeBytes += methodAccData.VolumeSizeBytes
|
|
|
|
|
methods = append(methods, methodAccData)
|
|
|
|
|
}
|
|
|
|
|
protocolsData = append(protocolsData, &AccumulativeStatsProtocol{
|
|
|
|
|
AccumulativeStatsCounter: AccumulativeStatsCounter{
|
|
|
|
|
Name: protocolName,
|
|
|
|
|
EntriesCount: entriesCount,
|
|
|
|
|
VolumeSizeBytes: volumeSizeBytes,
|
|
|
|
|
},
|
|
|
|
|
Color: protocolToColor[protocolName],
|
|
|
|
|
Methods: methods,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
finalResult = append(finalResult, &AccumulativeStatsProtocolTime{
|
|
|
|
|
Time: timeKey.UnixMilli(),
|
|
|
|
|
ProtocolsData: protocolsData,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
return finalResult
|
|
|
|
|
}
|
|
|
|
|
@@ -128,8 +233,15 @@ func EntryAdded(size int, summery *api.BaseEntry) {
|
|
|
|
|
generalStats.LastEntryTimestamp = currentTimestamp
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//GetBucketOfTimeStamp Round the entry to the nearest threshold (one minute) floored (e.g: 15:31:45 -> 15:31:00)
|
|
|
|
|
func GetBucketOfTimeStamp(timestamp int64) time.Time {
|
|
|
|
|
entryTimeStampAsTime := time.UnixMilli(timestamp)
|
|
|
|
|
return entryTimeStampAsTime.Add(-1 * InternalBucketThreshold / 2).Round(InternalBucketThreshold)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func addToBucketStats(size int, summery *api.BaseEntry) {
|
|
|
|
|
entryTimeBucketRounded := time.Unix(summery.Timestamp, 0).Round(time.Minute * 1)
|
|
|
|
|
entryTimeBucketRounded := GetBucketOfTimeStamp(summery.Timestamp)
|
|
|
|
|
|
|
|
|
|
if len(bucketsStats) == 0 {
|
|
|
|
|
bucketsStats = append(bucketsStats, &TimeFrameStatsValue{
|
|
|
|
|
BucketTime: entryTimeBucketRounded,
|
|
|
|
|
|