roee changes -

trying to fix agent to work with the "api" object) - ***still not working***
This commit is contained in:
Roee Gadot
2021-08-18 08:28:22 +03:00
parent e1267b6c44
commit 77783a58e9
19 changed files with 383 additions and 202 deletions

View File

@@ -5,7 +5,10 @@ import (
"context"
"encoding/json"
"fmt"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/database"
"mizuserver/pkg/holder"
"net/http"
"net/url"
"os"
"path"
@@ -16,6 +19,7 @@ import (
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
"mizuserver/pkg/resolver"
@@ -46,7 +50,7 @@ func StartResolving(namespace string) {
holder.SetResolver(res)
}
func StartReadingEntries(harChannel <-chan *tap.OutputChannelItem, workingDir *string) {
func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir *string) {
if workingDir != nil && *workingDir != "" {
startReadingFiles(*workingDir)
} else {
@@ -100,14 +104,18 @@ func startReadingFiles(workingDir string) {
}
}
func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem) {
if outputItems == nil {
panic("Channel of captured messages is nil")
}
// for item := range outputItems {
// saveHarToDb(item.HarEntry, item.ConnectionInfo)
// }
for item := range outputItems {
if harEntry, err := models.NewEntry(item.Data.Request.Orig.(*http.Request), item.Data.Request.CaptureTime, item.Data.Response.Orig.(*http.Response), item.Data.Response.CaptureTime); err == nil {
saveHarToDb(harEntry, item.ConnectionInfo)
} else {
rlog.Errorf("Error when creating HTTP entry")
}
}
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
@@ -117,59 +125,59 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
}
}
// func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
// entryBytes, _ := json.Marshal(entry)
// serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
// entryId := primitive.NewObjectID().Hex()
// var (
// resolvedSource string
// resolvedDestination string
// )
// if k8sResolver != nil {
// unresolvedSource := connectionInfo.ClientIP
// resolvedSource = k8sResolver.Resolve(unresolvedSource)
// if resolvedSource == "" {
// rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
// if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
// return
// }
// }
// unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
// resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
// if resolvedDestination == "" {
// rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
// if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
// return
// }
// }
// }
func saveHarToDb(entry *har.Entry, connectionInfo *tapApi.ConnectionInfo) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
entryId := primitive.NewObjectID().Hex()
var (
resolvedSource string
resolvedDestination string
)
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
}
}
// mizuEntry := models.MizuEntry{
// EntryId: entryId,
// Entry: string(entryBytes), // simple way to store it and not convert to bytes
// Service: serviceName,
// Url: entry.Request.URL,
// Path: urlPath,
// Method: entry.Request.Method,
// Status: entry.Response.Status,
// RequestSenderIp: connectionInfo.ClientIP,
// Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
// ResolvedSource: resolvedSource,
// ResolvedDestination: resolvedDestination,
// IsOutgoing: connectionInfo.IsOutgoing,
// }
// mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
// database.CreateEntry(&mizuEntry)
mizuEntry := models.MizuEntry{
EntryId: entryId,
Entry: string(entryBytes), // simple way to store it and not convert to bytes
Service: serviceName,
Url: entry.Request.URL,
Path: urlPath,
Method: entry.Request.Method,
Status: entry.Response.Status,
RequestSenderIp: connectionInfo.ClientIP,
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
IsOutgoing: connectionInfo.IsOutgoing,
}
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
database.CreateEntry(&mizuEntry)
// baseEntry := models.BaseEntryDetails{}
// if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil {
// return
// }
// baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName)
// baseEntry.Latency = entry.Timings.Receive
// baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
// BroadcastToBrowserClients(baseEntryBytes)
// }
baseEntry := models.BaseEntryDetails{}
if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil {
return
}
baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName)
baseEntry.Latency = entry.Timings.Receive
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
BroadcastToBrowserClients(baseEntryBytes)
}
func getServiceNameFromUrl(inputUrl string) (string, string) {
parsed, err := url.Parse(inputUrl)

View File

@@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"fmt"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
@@ -10,7 +11,6 @@ import (
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
)
var browserClientSocketUUIDs = make([]int, 0)
@@ -18,7 +18,7 @@ var socketListLock = sync.Mutex{}
type RoutesEventHandlers struct {
EventHandlers
SocketHarOutChannel chan<- *tap.OutputChannelItem
SocketOutChannel chan<- *tapApi.OutputChannelItem
}
func init() {
@@ -71,7 +71,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
h.SocketHarOutChannel <- tappedEntryMessage.Data
h.SocketOutChannel <- tappedEntryMessage.Data
}
case shared.WebSocketMessageTypeUpdateStatus:
var statusMessage shared.WebSocketStatusMessage