Compare commits

..

3 Commits

Author SHA1 Message Date
gadotroee
3a9236a381 Fix replay to be like the test (#1173)
Represent should get entry that Marshaled and UnMarshaled
2022-06-29 11:54:16 +03:00
AmitUp9
2e7fd34210 Move general functions from timeline stats to helpers utils (#1170)
* no message

* format document
2022-06-29 11:15:22 +03:00
gadotroee
01af6aa19c Add reply endpoint for http (#1168) 2022-06-28 18:39:23 +03:00
9 changed files with 370 additions and 29 deletions

View File

@@ -131,6 +131,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin
routes.MetadataRoutes(ginApp)
routes.StatusRoutes(ginApp)
routes.DbRoutes(ginApp)
routes.ReplayRoutes(ginApp)
return ginApp
}
@@ -155,7 +156,7 @@ func runInTapperMode() {
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{
HostMode: hostMode,
HostMode: hostMode,
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)

View File

@@ -0,0 +1,34 @@
package controllers
import (
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/replay"
"github.com/up9inc/mizu/agent/pkg/validation"
"github.com/up9inc/mizu/logger"
)
const (
replayTimeout = 10 * time.Second
)
func ReplayRequest(c *gin.Context) {
logger.Log.Debug("Starting replay")
replayDetails := &replay.Details{}
if err := c.Bind(replayDetails); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
logger.Log.Debugf("Validating replay, %v", replayDetails)
if err := validation.Validate(replayDetails); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
logger.Log.Debug("Executing replay, %v", replayDetails)
result := replay.ExecuteRequest(replayDetails, replayTimeout)
c.JSON(http.StatusOK, result)
}

View File

@@ -60,7 +60,7 @@ func GetTappedPodsStatus() []shared.TappedPodStatus {
func SetNodeToTappedPodMap(nodeToTappedPodsMap shared.NodeToPodsMap) {
summary := nodeToTappedPodsMap.Summary()
logger.Log.Infof("Setting node to tapped pods map to %v", summary)
logger.Log.Debugf("Setting node to tapped pods map to %v", summary)
nodeHostToTappedPodsMap = nodeToTappedPodsMap
}

186
agent/pkg/replay/replay.go Normal file
View File

@@ -0,0 +1,186 @@
package replay
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/up9inc/mizu/agent/pkg/app"
tapApi "github.com/up9inc/mizu/tap/api"
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
)
var (
inProcessRequestsLocker = sync.Mutex{}
inProcessRequests = 0
)
const maxParallelAction = 5
type Details struct {
Method string `json:"method"`
Url string `json:"url"`
Body string `json:"body"`
Headers map[string]string `json:"headers"`
}
type Response struct {
Success bool `json:"status"`
Data interface{} `json:"data"`
ErrorMessage string `json:"errorMessage"`
}
func incrementCounter() bool {
result := false
inProcessRequestsLocker.Lock()
if inProcessRequests < maxParallelAction {
inProcessRequests++
result = true
}
inProcessRequestsLocker.Unlock()
return result
}
func decrementCounter() {
inProcessRequestsLocker.Lock()
inProcessRequests--
inProcessRequestsLocker.Unlock()
}
func getEntryFromRequestResponse(extension *tapApi.Extension, request *http.Request, response *http.Response) *tapApi.Entry {
captureTime := time.Now()
itemTmp := tapApi.OutputChannelItem{
Protocol: *extension.Protocol,
ConnectionInfo: &tapApi.ConnectionInfo{
ClientIP: "",
ClientPort: "1",
ServerIP: "",
ServerPort: "1",
IsOutgoing: false,
},
Capture: "",
Timestamp: time.Now().UnixMilli(),
Pair: &tapApi.RequestResponsePair{
Request: tapApi.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
CaptureSize: 0,
Payload: &mizuhttp.HTTPPayload{
Type: mizuhttp.TypeHttpRequest,
Data: request,
},
},
Response: tapApi.GenericMessage{
IsRequest: false,
CaptureTime: captureTime,
CaptureSize: 0,
Payload: &mizuhttp.HTTPPayload{
Type: mizuhttp.TypeHttpResponse,
Data: response,
},
},
},
}
// Analyze is expecting an item that's marshalled and unmarshalled
itemMarshalled, err := json.Marshal(itemTmp)
if err != nil {
return nil
}
var finalItem *tapApi.OutputChannelItem
if err := json.Unmarshal(itemMarshalled, &finalItem); err != nil {
return nil
}
return extension.Dissector.Analyze(finalItem, "", "", "")
}
func ExecuteRequest(replayData *Details, timeout time.Duration) *Response {
if incrementCounter() {
defer decrementCounter()
client := &http.Client{
Timeout: timeout,
}
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
if err != nil {
return &Response{
Success: false,
Data: nil,
ErrorMessage: err.Error(),
}
}
for headerKey, headerValue := range replayData.Headers {
request.Header.Add(headerKey, headerValue)
}
request.Header.Add("x-mizu", uuid.New().String())
response, requestErr := client.Do(request)
if requestErr != nil {
return &Response{
Success: false,
Data: nil,
ErrorMessage: requestErr.Error(),
}
}
extension := app.ExtensionsMap["http"] // # TODO: maybe pass the extension to the function so it can be tested
entry := getEntryFromRequestResponse(extension, request, response)
base := extension.Dissector.Summarize(entry)
var representation []byte
// Represent is expecting an entry that's marshalled and unmarshalled
entryMarshalled, err := json.Marshal(entry)
if err != nil {
return &Response{
Success: false,
Data: nil,
ErrorMessage: err.Error(),
}
}
var entryUnmarshalled *tapApi.Entry
if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil {
return &Response{
Success: false,
Data: nil,
ErrorMessage: err.Error(),
}
}
representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response)
if err != nil {
return &Response{
Success: false,
Data: nil,
ErrorMessage: err.Error(),
}
}
return &Response{
Success: true,
Data: &tapApi.EntryWrapper{
Protocol: *extension.Protocol,
Representation: string(representation),
Data: entryUnmarshalled,
Base: base,
Rules: nil,
IsRulesEnabled: false,
},
ErrorMessage: "",
}
} else {
return &Response{
Success: false,
Data: nil,
ErrorMessage: fmt.Sprintf("reached threshold of %d requests", maxParallelAction),
}
}
}

View File

@@ -0,0 +1,108 @@
package replay
import (
"bytes"
"fmt"
"net/http"
"strings"
"testing"
"time"
"encoding/json"
"github.com/google/uuid"
tapApi "github.com/up9inc/mizu/tap/api"
mizuhttp "github.com/up9inc/mizu/tap/extensions/http"
)
func TestValid(t *testing.T) {
client := &http.Client{
Timeout: 10 * time.Second,
}
tests := map[string]*Details{
"40x": {
Method: "GET",
Url: "http://httpbin.org/status/404",
Body: "",
Headers: map[string]string{},
},
"20x": {
Method: "GET",
Url: "http://httpbin.org/status/200",
Body: "",
Headers: map[string]string{},
},
"50x": {
Method: "GET",
Url: "http://httpbin.org/status/500",
Body: "",
Headers: map[string]string{},
},
// TODO: this should be fixes, currently not working because of header name with ":"
//":path-header": {
// Method: "GET",
// Url: "http://httpbin.org/get",
// Body: "",
// Headers: map[string]string{
// ":path": "/get",
// },
// },
}
for testCaseName, replayData := range tests {
t.Run(fmt.Sprintf("%+v", testCaseName), func(t *testing.T) {
request, err := http.NewRequest(strings.ToUpper(replayData.Method), replayData.Url, bytes.NewBufferString(replayData.Body))
if err != nil {
t.Errorf("Error executing request")
}
for headerKey, headerValue := range replayData.Headers {
request.Header.Add(headerKey, headerValue)
}
request.Header.Add("x-mizu", uuid.New().String())
response, requestErr := client.Do(request)
if requestErr != nil {
t.Errorf("failed: %v, ", requestErr)
}
extensionHttp := &tapApi.Extension{}
dissectorHttp := mizuhttp.NewDissector()
dissectorHttp.Register(extensionHttp)
extensionHttp.Dissector = dissectorHttp
extension := extensionHttp
entry := getEntryFromRequestResponse(extension, request, response)
base := extension.Dissector.Summarize(entry)
// Represent is expecting an entry that's marshalled and unmarshalled
entryMarshalled, err := json.Marshal(entry)
if err != nil {
t.Errorf("failed marshaling entry: %v, ", err)
}
var entryUnmarshalled *tapApi.Entry
if err := json.Unmarshal(entryMarshalled, &entryUnmarshalled); err != nil {
t.Errorf("failed unmarshaling entry: %v, ", err)
}
var representation []byte
representation, err = extension.Dissector.Represent(entryUnmarshalled.Request, entryUnmarshalled.Response)
if err != nil {
t.Errorf("failed: %v, ", err)
}
result := &tapApi.EntryWrapper{
Protocol: *extension.Protocol,
Representation: string(representation),
Data: entry,
Base: base,
Rules: nil,
IsRulesEnabled: false,
}
t.Logf("%+v", result)
//data, _ := json.MarshalIndent(result, "", " ")
//t.Logf("%+v", string(data))
})
}
}

View File

@@ -0,0 +1,13 @@
package routes
import (
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/controllers"
)
// ReplayRoutes defines the group of replay routes.
func ReplayRoutes(app *gin.Engine) {
routeGroup := app.Group("/replay")
routeGroup.POST("/", controllers.ReplayRequest)
}

View File

@@ -55,7 +55,6 @@ type WebSocketMessageMetadata struct {
MessageType WebSocketMessageType `json:"messageType,omitempty"`
}
type WebSocketStatusMessage struct {
*WebSocketMessageMetadata
TappingStatus []TappedPodStatus `json:"tappingStatus"`

View File

@@ -1,6 +1,6 @@
import styles from "./TimelineBarChart.module.sass";
import { StatsMode } from "../TrafficStatsModal"
import React, { useCallback, useEffect, useMemo, useState } from "react";
import React, { useEffect, useMemo, useState } from "react";
import {
BarChart,
Bar,
@@ -20,40 +20,24 @@ export const TimelineBarChart: React.FC<TimelineBarChartProps> = ({ timeLineBarC
const [protocolStats, setProtocolStats] = useState([]);
const [protocolsNamesAndColors, setProtocolsNamesAndColors] = useState([]);
const padTo2Digits = useCallback((num) => {
return String(num).padStart(2, '0');
}, [])
const getHoursAndMinutes = useCallback((protocolTimeKey) => {
const time = new Date(protocolTimeKey)
const hoursAndMinutes = padTo2Digits(time.getHours()) + ':' + padTo2Digits(time.getMinutes());
return hoursAndMinutes;
}, [padTo2Digits])
const creatUniqueObjArray = useCallback((objArray) => {
return [
...new Map(objArray.map((item) => [item["name"], item])).values(),
];
}, [])
useEffect(() => {
if (!data) return;
const protocolsBarsData = [];
const prtcNames = [];
data.forEach(protocolObj => {
let obj: { [k: string]: any } = {};
obj.timestamp = getHoursAndMinutes(protocolObj.timestamp);
obj.timestamp = Utils.getHoursAndMinutes(protocolObj.timestamp);
protocolObj.protocols.forEach(protocol => {
obj[`${protocol.name}`] = protocol[StatsMode[timeLineBarChartMode]];
prtcNames.push({ name: protocol.name, color: protocol.color });
})
protocolsBarsData.push(obj);
})
const uniqueObjArray = creatUniqueObjArray(prtcNames);
const uniqueObjArray = Utils.creatUniqueObjArrayByProp(prtcNames, "name")
protocolsBarsData.sort((a, b) => a.timestamp < b.timestamp ? -1 : 1);
setProtocolStats(protocolsBarsData);
setProtocolsNamesAndColors(uniqueObjArray);
}, [data, timeLineBarChartMode, setProtocolStats, setProtocolsNamesAndColors, creatUniqueObjArray, getHoursAndMinutes])
}, [data, timeLineBarChartMode])
const bars = useMemo(() => protocolsNamesAndColors.map((protocolToDIsplay) => {
return <Bar key={protocolToDIsplay.name} dataKey={protocolToDIsplay.name} stackId="a" fill={protocolToDIsplay.color} />
@@ -61,7 +45,7 @@ export const TimelineBarChart: React.FC<TimelineBarChartProps> = ({ timeLineBarC
return (
<div className={styles.barChartContainer}>
<BarChart
{protocolStats.length > 0 && <BarChart
width={730}
height={250}
data={protocolStats}
@@ -77,7 +61,7 @@ export const TimelineBarChart: React.FC<TimelineBarChartProps> = ({ timeLineBarC
<Tooltip formatter={(value) => timeLineBarChartMode === "VOLUME" ? Utils.humanFileSize(value) : value + " Requests"} />
<Legend />
{bars}
</BarChart>
</BarChart>}
</div>
);
}

View File

@@ -2,10 +2,10 @@ const IP_ADDRESS_REGEX = /([0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3})(:([0-9]{
export class Utils {
static isIpAddress = (address: string): boolean => IP_ADDRESS_REGEX.test(address)
static lineNumbersInString = (code:string): number => code.split("\n").length;
static isIpAddress = (address: string): boolean => IP_ADDRESS_REGEX.test(address)
static lineNumbersInString = (code: string): number => code.split("\n").length;
static humanFileSize(bytes, si=false, dp=1) {
static humanFileSize(bytes, si = false, dp = 1) {
const thresh = si ? 1000 : 1024;
if (Math.abs(bytes) < thresh) {
@@ -16,7 +16,7 @@ export class Utils {
? ['kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
let u = -1;
const r = 10**dp;
const r = 10 ** dp;
do {
bytes /= thresh;
@@ -26,4 +26,20 @@ export class Utils {
return bytes.toFixed(dp) + ' ' + units[u];
}
static padTo2Digits = (num) => {
return String(num).padStart(2, '0');
}
static getHoursAndMinutes = (protocolTimeKey) => {
const time = new Date(protocolTimeKey)
const hoursAndMinutes = Utils.padTo2Digits(time.getHours()) + ':' + Utils.padTo2Digits(time.getMinutes());
return hoursAndMinutes;
}
static creatUniqueObjArrayByProp = (objArray, prop) => {
const map = new Map(objArray.map((item) => [item[prop], item])).values()
return Array.from(map);
}
}