Compare commits

...

2 Commits

Author SHA1 Message Date
M. Mert Yıldıran
308fa78955 TRA-4383 Calculate request and response sizes and display them instead of BodySize field (#897)
* Define `ReadProgress` struct and update `Dissector` interface such that the `bufio.Reader` progress can be learned on item emitting

* Display the `requestSize` and `responseSize` fields in the UI

* Update the tests

* publish ui-common version 1.0.130 and bump to this version in ui/package.json file

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2022-03-21 19:34:59 +02:00
RoyUP9
cff5987ed4 Added check pre install (#905) 2022-03-21 17:19:04 +02:00
33 changed files with 232 additions and 126 deletions

View File

@@ -120,7 +120,17 @@ func GetEntry(c *gin.Context) {
extension := app.ExtensionsMap[entry.Protocol.Name] extension := app.ExtensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry) base := extension.Dissector.Summarize(entry)
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response) var representation []byte
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
"type": "error",
"autoClose": "5000",
"msg": err.Error(),
})
return // exit
}
var rules []map[string]interface{} var rules []map[string]interface{}
var isRulesEnabled bool var isRulesEnabled bool
@@ -137,7 +147,6 @@ func GetEntry(c *gin.Context) {
c.JSON(http.StatusOK, tapApi.EntryWrapper{ c.JSON(http.StatusOK, tapApi.EntryWrapper{
Protocol: entry.Protocol, Protocol: entry.Protocol,
Representation: string(representation), Representation: string(representation),
BodySize: bodySize,
Data: entry, Data: entry,
Base: base, Base: base,
Rules: rules, Rules: rules,

View File

@@ -27,5 +27,6 @@ func init() {
} }
checkCmd.Flags().Bool(configStructs.PreTapCheckName, defaultCheckConfig.PreTap, "Check pre-tap Mizu installation for potential problems") checkCmd.Flags().Bool(configStructs.PreTapCheckName, defaultCheckConfig.PreTap, "Check pre-tap Mizu installation for potential problems")
checkCmd.Flags().Bool(configStructs.PreInstallCheckName, defaultCheckConfig.PreInstall, "Check pre-install Mizu installation for potential problems")
checkCmd.Flags().Bool(configStructs.ImagePullCheckName, defaultCheckConfig.ImagePull, "Test connectivity to container image registry by creating and removing a temporary pod in 'default' namespace") checkCmd.Flags().Bool(configStructs.ImagePullCheckName, defaultCheckConfig.ImagePull, "Test connectivity to container image registry by creating and removing a temporary pod in 'default' namespace")
} }

View File

@@ -4,13 +4,15 @@ import (
"context" "context"
"embed" "embed"
"fmt" "fmt"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config" "github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils" "github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes" "github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
rbac "k8s.io/api/rbac/v1" rbac "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
"strings"
) )
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool { func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {
@@ -29,42 +31,75 @@ func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesP
return false return false
} }
obj, err := getDecodedObject(data) decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil { if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err) logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false return false
} }
var rules []rbac.PolicyRule switch resource := obj.(type) {
if config.Config.IsNsRestrictedMode() { case *rbac.Role:
rules = obj.(*rbac.Role).Rules return checkRulesPermissions(ctx, kubernetesProvider, resource.Rules, config.Config.MizuResourcesNamespace)
} else { case *rbac.ClusterRole:
rules = obj.(*rbac.ClusterRole).Rules return checkRulesPermissions(ctx, kubernetesProvider, resource.Rules, "")
} }
return checkPermissions(ctx, kubernetesProvider, rules) logger.Log.Errorf("%v error while checking kubernetes permissions, err: resource of type 'Role' or 'ClusterRole' not found in permission files", fmt.Sprintf(uiUtils.Red, "✗"))
return false
} }
func getDecodedObject(data []byte) (runtime.Object, error) { func InstallKubernetesPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
decode := scheme.Codecs.UniversalDeserializer().Decode logger.Log.Infof("\nkubernetes-permissions\n--------------------")
obj, _, err := decode(data, nil, nil) bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
if err != nil { if err != nil {
return nil, err logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
} }
return obj, nil resourcesTemplate := strings.Split(installTemplate, "---")[1:]
permissionsExist := true
decode := scheme.Codecs.UniversalDeserializer().Decode
for _, resourceTemplate := range resourcesTemplate {
obj, _, err := decode([]byte(resourceTemplate), nil, nil)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
groupVersionKind := obj.GetObjectKind().GroupVersionKind()
resource := fmt.Sprintf("%vs", strings.ToLower(groupVersionKind.Kind))
permissionsExist = checkCreatePermission(ctx, kubernetesProvider, resource, groupVersionKind.Group, obj.(metav1.Object).GetNamespace()) && permissionsExist
switch resourceObj := obj.(type) {
case *rbac.Role:
permissionsExist = checkRulesPermissions(ctx, kubernetesProvider, resourceObj.Rules, resourceObj.Namespace) && permissionsExist
case *rbac.ClusterRole:
permissionsExist = checkRulesPermissions(ctx, kubernetesProvider, resourceObj.Rules, "") && permissionsExist
}
}
return permissionsExist
} }
func checkPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule) bool { func checkCreatePermission(ctx context.Context, kubernetesProvider *kubernetes.Provider, resource string, group string, namespace string) bool {
exist, err := kubernetesProvider.CanI(ctx, namespace, resource, "create", group)
return checkPermissionExist(group, resource, "create", namespace, exist, err)
}
func checkRulesPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule, namespace string) bool {
permissionsExist := true permissionsExist := true
for _, rule := range rules { for _, rule := range rules {
for _, group := range rule.APIGroups { for _, group := range rule.APIGroups {
for _, resource := range rule.Resources { for _, resource := range rule.Resources {
for _, verb := range rule.Verbs { for _, verb := range rule.Verbs {
exist, err := kubernetesProvider.CanI(ctx, config.Config.MizuResourcesNamespace, resource, verb, group) exist, err := kubernetesProvider.CanI(ctx, namespace, resource, verb, group)
permissionsExist = checkPermissionExist(group, resource, verb, exist, err) && permissionsExist permissionsExist = checkPermissionExist(group, resource, verb, namespace, exist, err) && permissionsExist
} }
} }
} }
@@ -73,15 +108,24 @@ func checkPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provid
return permissionsExist return permissionsExist
} }
func checkPermissionExist(group string, resource string, verb string, exist bool, err error) bool { func checkPermissionExist(group string, resource string, verb string, namespace string, exist bool, err error) bool {
var groupAndNamespace string
if group != "" && namespace != "" {
groupAndNamespace = fmt.Sprintf("in group '%v' and namespace '%v'", group, namespace)
} else if group != "" {
groupAndNamespace = fmt.Sprintf("in group '%v'", group)
} else if namespace != "" {
groupAndNamespace = fmt.Sprintf("in namespace '%v'", namespace)
}
if err != nil { if err != nil {
logger.Log.Errorf("%v error checking permission for %v %v in group '%v', err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group, err) logger.Log.Errorf("%v error checking permission for %v %v %v, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, groupAndNamespace, err)
return false return false
} else if !exist { } else if !exist {
logger.Log.Errorf("%v can't %v %v in group '%v'", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group) logger.Log.Errorf("%v can't %v %v %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, groupAndNamespace)
return false return false
} }
logger.Log.Infof("%v can %v %v in group '%v'", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, group) logger.Log.Infof("%v can %v %v %v", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, groupAndNamespace)
return true return true
} }

View File

@@ -31,6 +31,10 @@ func runMizuCheck() {
if checkPassed { if checkPassed {
checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider) checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider)
} }
} else if config.Config.Check.PreInstall {
if checkPassed {
checkPassed = check.InstallKubernetesPermissions(ctx, kubernetesProvider)
}
} else { } else {
if checkPassed { if checkPassed {
checkPassed = check.KubernetesResources(ctx, kubernetesProvider) checkPassed = check.KubernetesResources(ctx, kubernetesProvider)

View File

@@ -1,11 +1,13 @@
package configStructs package configStructs
const ( const (
PreTapCheckName = "pre-tap" PreTapCheckName = "pre-tap"
ImagePullCheckName = "image-pull" PreInstallCheckName = "pre-install"
ImagePullCheckName = "image-pull"
) )
type CheckConfig struct { type CheckConfig struct {
PreTap bool `yaml:"pre-tap"` PreTap bool `yaml:"pre-tap"`
ImagePull bool `yaml:"image-pull"` PreInstall bool `yaml:"pre-install"`
ImagePull bool `yaml:"image-pull"`
} }

View File

@@ -19,7 +19,7 @@ import (
const mizuTestEnvVar = "MIZU_TEST" const mizuTestEnvVar = "MIZU_TEST"
var UnknownIp net.IP = net.IP{0, 0, 0, 0} var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownPort uint16 = 0 var UnknownPort uint16 = 0
type Protocol struct { type Protocol struct {
@@ -83,6 +83,7 @@ type CounterPair struct {
type GenericMessage struct { type GenericMessage struct {
IsRequest bool `json:"isRequest"` IsRequest bool `json:"isRequest"`
CaptureTime time.Time `json:"captureTime"` CaptureTime time.Time `json:"captureTime"`
CaptureSize int `json:"captureSize"`
Payload interface{} `json:"payload"` Payload interface{} `json:"payload"`
} }
@@ -110,13 +111,27 @@ type SuperIdentifier struct {
IsClosedOthers bool IsClosedOthers bool
} }
type ReadProgress struct {
readBytes int
lastCurrent int
}
func (p *ReadProgress) Feed(n int) {
p.readBytes += n
}
func (p *ReadProgress) Current() (n int) {
p.lastCurrent = p.readBytes - p.lastCurrent
return p.lastCurrent
}
type Dissector interface { type Dissector interface {
Register(*Extension) Register(*Extension)
Ping() Ping()
Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error Dissect(b *bufio.Reader, progress *ReadProgress, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error)
Macros() map[string]string Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher NewResponseRequestMatcher() RequestResponseMatcher
} }
@@ -152,6 +167,8 @@ type Entry struct {
StartTime time.Time `json:"startTime"` StartTime time.Time `json:"startTime"`
Request map[string]interface{} `json:"request"` Request map[string]interface{} `json:"request"`
Response map[string]interface{} `json:"response"` Response map[string]interface{} `json:"response"`
RequestSize int `json:"requestSize"`
ResponseSize int `json:"responseSize"`
ElapsedTime int64 `json:"elapsedTime"` ElapsedTime int64 `json:"elapsedTime"`
Rules ApplicableRules `json:"rules,omitempty"` Rules ApplicableRules `json:"rules,omitempty"`
ContractStatus ContractStatus `json:"contractStatus,omitempty"` ContractStatus ContractStatus `json:"contractStatus,omitempty"`
@@ -164,7 +181,6 @@ type Entry struct {
type EntryWrapper struct { type EntryWrapper struct {
Protocol Protocol `json:"protocol"` Protocol Protocol `json:"protocol"`
Representation string `json:"representation"` Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data *Entry `json:"data"` Data *Entry `json:"data"`
Base *BaseEntry `json:"base"` Base *BaseEntry `json:"base"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"` Rules []map[string]interface{} `json:"rulesMatched,omitempty"`

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/amqp/\* expect

View File

@@ -94,7 +94,7 @@ type AMQPWrapper struct {
Details interface{} `json:"details"` Details interface{} `json:"details"`
} }
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) { func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, captureSize int, emitter api.Emitter, capture api.Capture) {
request := &api.GenericMessage{ request := &api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: captureTime, CaptureTime: captureTime,

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request" const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b} r := AmqpReader{b}
var remaining int var remaining int
@@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
case *BasicPublish: case *BasicPublish:
eventBasicPublish.Body = f.Body eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.Body = f.Body eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
} }
case *MethodFrame: case *MethodFrame:
@@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicConsume: case *BasicConsume:
eventBasicConsume := &BasicConsume{ eventBasicConsume := &BasicConsume{
@@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *BasicDeliver: case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ExchangeDeclare: case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{ eventExchangeDeclare := &ExchangeDeclare{
@@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Arguments: m.Arguments, Arguments: m.Arguments,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ConnectionStart: case *ConnectionStart:
eventConnectionStart := &ConnectionStart{ eventConnectionStart := &ConnectionStart{
@@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
Locales: m.Locales, Locales: m.Locales,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
case *ConnectionClose: case *ConnectionClose:
eventConnectionClose := &ConnectionClose{ eventConnectionClose := &ConnectionClose{
@@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
MethodId: m.MethodId, MethodId: m.MethodId,
} }
superIdentifier.Protocol = &protocol superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture) emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, progress.Current(), emitter, capture)
} }
default: default:
@@ -236,6 +236,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Namespace: namespace, Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing, Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails, Request: reqDetails,
RequestSize: item.Pair.Request.CaptureSize,
Timestamp: item.Timestamp, Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime, StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0, ElapsedTime: 0,
@@ -301,8 +302,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
} }
} }
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) { func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
bodySize = 0
representation := make(map[string]interface{}) representation := make(map[string]interface{})
var repRequest []interface{} var repRequest []interface{}
switch request["method"].(string) { switch request["method"].(string) {

View File

@@ -122,7 +122,7 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -140,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -319,7 +319,7 @@ func TestRepresent(t *testing.T) {
var objects []string var objects []string
for _, entry := range entries { for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response) object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err) assert.Nil(t, err)
objects = append(objects, string(object)) objects = append(objects, string(object))
} }

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = "" item.ConnectionInfo.ClientPort = ""
} }
func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error { func handleHTTP2Stream(http2Assembler *Http2Assembler, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage() streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil { if err != nil {
return err return err
@@ -66,7 +66,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
streamID, streamID,
"HTTP2", "HTTP2",
) )
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, messageHTTP1.ProtoMinor) item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -86,7 +86,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
streamID, streamID,
"HTTP2", "HTTP2",
) )
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, messageHTTP1.ProtoMinor) item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime, progress.Current(), messageHTTP1.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,
@@ -111,7 +111,7 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpI
return nil return nil
} }
func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) { func handleHTTP1ClientStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b) req, err = http.ReadRequest(b)
if err != nil { if err != nil {
return return
@@ -139,7 +139,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
requestCounter, requestCounter,
"HTTP1", "HTTP1",
) )
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor) item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -154,7 +154,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
return return
} }
func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) { func handleHTTP1ServerStream(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response var res *http.Response
res, err = http.ReadResponse(b, nil) res, err = http.ReadResponse(b, nil)
if err != nil { if err != nil {
@@ -183,7 +183,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.Tc
responseCounter, responseCounter,
"HTTP1", "HTTP1",
) )
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, res.ProtoMinor) item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime, progress.Current(), res.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP, ClientIP: tcpID.DstIP,

View File

@@ -86,7 +86,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name) log.Printf("pong %s", http11protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error var err error
@@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
} }
if isHTTP2 { if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher) err = handleHTTP2Stream(http2Assembler, progress, capture, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
superIdentifier.Protocol = &http11protocol superIdentifier.Protocol = &http11protocol
} else if isClient { } else if isClient {
var req *http.Request var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -148,7 +148,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
tcpID.DstPort, tcpID.DstPort,
"HTTP2", "HTTP2",
) )
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, req.ProtoMinor) item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime, progress.Current(), req.ProtoMinor)
if item != nil { if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP, ClientIP: tcpID.SrcIP,
@@ -162,7 +162,7 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
} }
} }
} else { } else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher) switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, progress, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
break break
} else if err != nil { } else if err != nil {
@@ -271,14 +271,16 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP, IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort, Port: item.ConnectionInfo.ServerPort,
}, },
Namespace: namespace, Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing, Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails, Request: reqDetails,
Response: resDetails, Response: resDetails,
Timestamp: item.Timestamp, RequestSize: item.Pair.Request.CaptureSize,
StartTime: item.Pair.Request.CaptureTime, ResponseSize: item.Pair.Response.CaptureSize,
ElapsedTime: elapsedTime, Timestamp: item.Timestamp,
HTTPPair: string(httpPair), StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
HTTPPair: string(httpPair),
} }
} }
@@ -410,11 +412,9 @@ func representRequest(request map[string]interface{}) (repRequest []interface{})
return return
} }
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) { func representResponse(response map[string]interface{}) (repResponse []interface{}) {
repResponse = make([]interface{}, 0) repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]api.TableData{ details, _ := json.Marshal([]api.TableData{
{ {
Name: "Status", Name: "Status",
@@ -428,7 +428,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
}, },
{ {
Name: "Body Size (bytes)", Name: "Body Size (bytes)",
Value: bodySize, Value: int64(response["bodySize"].(float64)),
Selector: `response.bodySize`, Selector: `response.bodySize`,
}, },
}) })
@@ -471,10 +471,10 @@ func representResponse(response map[string]interface{}) (repResponse []interface
return return
} }
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) { func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
representation := make(map[string]interface{}) representation := make(map[string]interface{})
repRequest := representRequest(request) repRequest := representRequest(request)
repResponse, bodySize := representResponse(response) repResponse := representResponse(response)
representation["request"] = repRequest representation["request"] = repRequest
representation["response"] = repResponse representation["response"] = repResponse
object, err = json.Marshal(representation) object, err = json.Marshal(representation)

View File

@@ -124,7 +124,7 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -142,7 +142,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err) panic(err)
} }
@@ -321,7 +321,7 @@ func TestRepresent(t *testing.T) {
var objects []string var objects []string
for _, entry := range entries { for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response) object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err) assert.Nil(t, err)
objects = append(objects, string(object)) objects = append(objects, string(object))
} }

View File

@@ -24,10 +24,11 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map {
func (matcher *requestResponseMatcher) SetMaxTry(value int) { func (matcher *requestResponseMatcher) SetMaxTry(value int) {
} }
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, protoMinor int) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, captureSize int, protoMinor int) *api.OutputChannelItem {
requestHTTPMessage := api.GenericMessage{ requestHTTPMessage := api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: captureTime, CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: api.HTTPPayload{ Payload: api.HTTPPayload{
Type: TypeHttpRequest, Type: TypeHttpRequest,
Data: request, Data: request,
@@ -47,10 +48,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
return nil return nil
} }
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, protoMinor int) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, captureSize int, protoMinor int) *api.OutputChannelItem {
responseHTTPMessage := api.GenericMessage{ responseHTTPMessage := api.GenericMessage{
IsRequest: false, IsRequest: false,
CaptureTime: captureTime, CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: api.HTTPPayload{ Payload: api.HTTPPayload{
Type: TypeHttpResponse, Type: TypeHttpResponse,
Data: response, Data: response,

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/kafka/\* expect

View File

@@ -35,7 +35,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name) log.Printf("pong %s", _protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for { for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol { if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
@@ -79,13 +79,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP, IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort, Port: item.ConnectionInfo.ServerPort,
}, },
Namespace: namespace, Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing, Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails, Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}), Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
Timestamp: item.Timestamp, RequestSize: item.Pair.Request.CaptureSize,
StartTime: item.Pair.Request.CaptureTime, ResponseSize: item.Pair.Response.CaptureSize,
ElapsedTime: elapsedTime, Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
} }
} }
@@ -208,8 +210,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
} }
} }
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) { func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
bodySize = 0
representation := make(map[string]interface{}) representation := make(map[string]interface{})
apiKey := ApiKey(request["apiKey"].(float64)) apiKey := ApiKey(request["apiKey"].(float64))

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10) reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@@ -320,7 +320,7 @@ func TestRepresent(t *testing.T) {
var objects []string var objects []string
for _, entry := range entries { for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response) object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err) assert.Nil(t, err)
objects = append(objects, string(object)) objects = append(objects, string(object))
} }

View File

@@ -265,6 +265,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Request: api.GenericMessage{ Request: api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: reqResPair.Request.CaptureTime, CaptureTime: reqResPair.Request.CaptureTime,
CaptureSize: int(reqResPair.Request.Size),
Payload: KafkaPayload{ Payload: KafkaPayload{
Data: &KafkaWrapper{ Data: &KafkaWrapper{
Method: apiNames[apiKey], Method: apiNames[apiKey],
@@ -276,6 +277,7 @@ func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPai
Response: api.GenericMessage{ Response: api.GenericMessage{
IsRequest: false, IsRequest: false,
CaptureTime: reqResPair.Response.CaptureTime, CaptureTime: reqResPair.Response.CaptureTime,
CaptureSize: int(reqResPair.Response.Size),
Payload: KafkaPayload{ Payload: KafkaPayload{
Data: &KafkaWrapper{ Data: &KafkaWrapper{
Method: apiNames[apiKey], Method: apiNames[apiKey],

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect: test-pull-expect:
@mkdir -p expect @mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect @[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/redis/\* expect

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/tap/api"
) )
func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error { func handleClientStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock() counterPair.Lock()
counterPair.Request++ counterPair.Request++
requestCounter := counterPair.Request requestCounter := counterPair.Request
@@ -21,7 +21,7 @@ func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
requestCounter, requestCounter,
) )
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime) item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime, progress.Current())
if item != nil { if item != nil {
item.Capture = capture item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{
@@ -36,7 +36,7 @@ func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
return nil return nil
} }
func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error { func handleServerStream(progress *api.ReadProgress, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock() counterPair.Lock()
counterPair.Response++ counterPair.Response++
responseCounter := counterPair.Response responseCounter := counterPair.Response
@@ -51,7 +51,7 @@ func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.
responseCounter, responseCounter,
) )
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime) item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime, progress.Current())
if item != nil { if item != nil {
item.Capture = capture item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{ item.ConnectionInfo = &api.ConnectionInfo{

View File

@@ -34,7 +34,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name) log.Printf("pong %s", protocol.Name)
} }
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error { func (d dissecting) Dissect(b *bufio.Reader, progress *api.ReadProgress, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher) reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{ is := &RedisInputStream{
Reader: b, Reader: b,
@@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool,
} }
if isClient { if isClient {
err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) err = handleClientStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else { } else {
err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher) err = handleServerStream(progress, capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} }
if err != nil { if err != nil {
@@ -82,13 +82,15 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP, IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort, Port: item.ConnectionInfo.ServerPort,
}, },
Namespace: namespace, Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing, Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails, Request: reqDetails,
Response: resDetails, Response: resDetails,
Timestamp: item.Timestamp, RequestSize: item.Pair.Request.CaptureSize,
StartTime: item.Pair.Request.CaptureTime, ResponseSize: item.Pair.Response.CaptureSize,
ElapsedTime: elapsedTime, Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
} }
} }
@@ -131,8 +133,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
} }
} }
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) { func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, err error) {
bodySize = 0
representation := make(map[string]interface{}) representation := make(map[string]interface{})
repRequest := representGeneric(request, `request.`) repRequest := representGeneric(request, `request.`)
repResponse := representGeneric(response, `response.`) repResponse := representGeneric(response, `response.`)

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
DstPort: "2", DstPort: "2",
} }
reqResMatcher := dissector.NewResponseRequestMatcher() reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferClient, &api.ReadProgress{}, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2", SrcPort: "2",
DstPort: "1", DstPort: "1",
} }
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher) err = dissector.Dissect(bufferServer, &api.ReadProgress{}, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err) log.Println(err)
} }
@@ -320,7 +320,7 @@ func TestRepresent(t *testing.T) {
var objects []string var objects []string
for _, entry := range entries { for _, entry := range entries {
object, _, err := dissector.Represent(entry.Request, entry.Response) object, err := dissector.Represent(entry.Request, entry.Response)
assert.Nil(t, err) assert.Nil(t, err)
objects = append(objects, string(object)) objects = append(objects, string(object))
} }

View File

@@ -22,10 +22,11 @@ func (matcher *requestResponseMatcher) GetMap() *sync.Map {
func (matcher *requestResponseMatcher) SetMaxTry(value int) { func (matcher *requestResponseMatcher) SetMaxTry(value int) {
} }
func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerRequest(ident string, request *RedisPacket, captureTime time.Time, captureSize int) *api.OutputChannelItem {
requestRedisMessage := api.GenericMessage{ requestRedisMessage := api.GenericMessage{
IsRequest: true, IsRequest: true,
CaptureTime: captureTime, CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: RedisPayload{ Payload: RedisPayload{
Data: &RedisWrapper{ Data: &RedisWrapper{
Method: string(request.Command), Method: string(request.Command),
@@ -48,10 +49,11 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *Re
return nil return nil
} }
func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time) *api.OutputChannelItem { func (matcher *requestResponseMatcher) registerResponse(ident string, response *RedisPacket, captureTime time.Time, captureSize int) *api.OutputChannelItem {
responseRedisMessage := api.GenericMessage{ responseRedisMessage := api.GenericMessage{
IsRequest: false, IsRequest: false,
CaptureTime: captureTime, CaptureTime: captureTime,
CaptureSize: captureSize,
Payload: RedisPayload{ Payload: RedisPayload{
Data: &RedisWrapper{ Data: &RedisWrapper{
Method: string(response.Command), Method: string(response.Command),

View File

@@ -40,6 +40,7 @@ type tcpReader struct {
isOutgoing bool isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte data []byte
progress *api.ReadProgress
superTimer *api.SuperTimer superTimer *api.SuperTimer
parent *tcpStream parent *tcpStream
packetsSeen uint packetsSeen uint
@@ -80,6 +81,8 @@ func (h *tcpReader) Read(p []byte) (int, error) {
l := copy(p, h.data) l := copy(p, h.data)
h.data = h.data[l:] h.data = h.data[l:]
h.progress.Feed(l)
return l, nil return l, nil
} }
@@ -96,7 +99,7 @@ func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
b := bufio.NewReader(h) b := bufio.NewReader(h)
// TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method // TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method
err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher) err := h.extension.Dissector.Dissect(b, h.progress, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil { if err != nil {
_, err = io.Copy(ioutil.Discard, b) _, err = io.Copy(ioutil.Discard, b)
if err != nil { if err != nil {

View File

@@ -89,6 +89,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
} }
stream.clients = append(stream.clients, tcpReader{ stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
progress: &api.ReadProgress{},
superTimer: &api.SuperTimer{}, superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport), ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{ tcpID: &api.TcpID{
@@ -108,6 +109,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
}) })
stream.servers = append(stream.servers, tcpReader{ stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg), msgQueue: make(chan tcpReaderDataMsg),
progress: &api.ReadProgress{},
superTimer: &api.SuperTimer{}, superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport), ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{ tcpID: &api.TcpID{

View File

@@ -146,6 +146,7 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
doneHandler: func(r *tlsReader) { doneHandler: func(r *tlsReader) {
p.closeReader(key, r) p.closeReader(key, r)
}, },
progress: &api.ReadProgress{},
} }
tcpid := p.buildTcpId(chunk, ip, port) tcpid := p.buildTcpId(chunk, ip, port)
@@ -158,7 +159,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader) b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{}, err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher) &api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
if err != nil { if err != nil {

View File

@@ -3,6 +3,8 @@ package tlstapper
import ( import (
"io" "io"
"time" "time"
"github.com/up9inc/mizu/tap/api"
) )
type tlsReader struct { type tlsReader struct {
@@ -10,6 +12,7 @@ type tlsReader struct {
chunks chan *tlsChunk chunks chan *tlsChunk
data []byte data []byte
doneHandler func(r *tlsReader) doneHandler func(r *tlsReader)
progress *api.ReadProgress
} }
func (r *tlsReader) Read(p []byte) (int, error) { func (r *tlsReader) Read(p []byte) (int, error) {
@@ -36,6 +39,7 @@ func (r *tlsReader) Read(p []byte) (int, error) {
l := copy(p, r.data) l := copy(p, r.data)
r.data = r.data[l:] r.data = r.data[l:]
r.progress.Feed(l)
return l, nil return l, nil
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "@up9/mizu-common", "name": "@up9/mizu-common",
"version": "1.0.129", "version": "1.0.130",
"description": "Made with create-react-library", "description": "Made with create-react-library",
"author": "", "author": "",
"license": "MIT", "license": "MIT",

View File

@@ -36,23 +36,36 @@ const useStyles = makeStyles(() => ({
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`; export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => { const EntryTitle: React.FC<any> = ({protocol, data, elapsedTime}) => {
const classes = useStyles(); const classes = useStyles();
const request = data.request;
const response = data.response; const response = data.response;
return <div className={classes.entryTitle}> return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/> <Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}> <div style={{right: "30px", position: "absolute", display: "flex"}}>
{response && <Queryable {request && <Queryable
query={`response.bodySize == ${bodySize}`} query={`requestSize == ${data.requestSize}`}
style={{margin: "0 18px"}} style={{margin: "0 18px"}}
displayIconOnMouseOver={true} displayIconOnMouseOver={true}
> >
<div <div
style={{opacity: 0.5}} style={{opacity: 0.5}}
id="entryDetailedTitleBodySize" id="entryDetailedTitleRequestSize"
> >
{formatSize(bodySize)} {`Request: ${formatSize(data.requestSize)}`}
</div>
</Queryable>}
{response && <Queryable
query={`responseSize == ${data.responseSize}`}
style={{margin: "0 18px"}}
displayIconOnMouseOver={true}
>
<div
style={{opacity: 0.5}}
id="entryDetailedTitleResponseSize"
>
{`Response: ${formatSize(data.responseSize)}`}
</div> </div>
</Queryable>} </Queryable>}
{response && <Queryable {response && <Queryable
@@ -64,7 +77,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
style={{opacity: 0.5}} style={{opacity: 0.5}}
id="entryDetailedTitleElapsedTime" id="entryDetailedTitleElapsedTime"
> >
{Math.round(elapsedTime)}ms {`Elapsed Time: ${Math.round(elapsedTime)}ms`}
</div> </div>
</Queryable>} </Queryable>}
</div> </div>
@@ -120,7 +133,6 @@ export const EntryDetailed = () => {
{entryData && <EntryTitle {entryData && <EntryTitle
protocol={entryData.protocol} protocol={entryData.protocol}
data={entryData.data} data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime} elapsedTime={entryData.data.elapsedTime}
/>} />}
{entryData && <EntrySummary entry={entryData.base}/>} {entryData && <EntrySummary entry={entryData.base}/>}

View File

@@ -4,7 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz';
import styles from './EntryListItem.module.sass'; import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode"; import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../../UI/Protocol" import Protocol, {ProtocolInterface} from "../../UI/Protocol"
import eBPFLogo from '../assets/ebpf.png'; import eBPFLogo from '../../assets/ebpf.png';
import {Summary} from "../../UI/Summary"; import {Summary} from "../../UI/Summary";
import Queryable from "../../UI/Queryable"; import Queryable from "../../UI/Queryable";
import ingoingIconSuccess from "assets/ingoing-traffic-success.svg" import ingoingIconSuccess from "assets/ingoing-traffic-success.svg"

View File

Before

Width:  |  Height:  |  Size: 21 KiB

After

Width:  |  Height:  |  Size: 21 KiB

View File

@@ -13,7 +13,7 @@
"@types/jest": "^26.0.22", "@types/jest": "^26.0.22",
"@types/node": "^12.20.10", "@types/node": "^12.20.10",
"@uiw/react-textarea-code-editor": "^1.4.12", "@uiw/react-textarea-code-editor": "^1.4.12",
"@up9/mizu-common": "^1.0.129", "@up9/mizu-common": "^1.0.130",
"axios": "^0.25.0", "axios": "^0.25.0",
"core-js": "^3.20.2", "core-js": "^3.20.2",
"craco-babel-loader": "^1.0.3", "craco-babel-loader": "^1.0.3",