Compare commits

...

5 Commits

Author SHA1 Message Date
RoyUP9
482e5c8b69 Added check pull image flag (#899)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 15:24:03 +02:00
RamiBerm
21902b5f86 Fix tapping status falling out of sync (#898)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-21 14:54:25 +02:00
gadotroee
a4d0e250c9 Fixing acceptance tests (#900) 2022-03-21 14:21:08 +02:00
M. Mert Yıldıran
5455220a3a Add an indicator for the eBPF sourced entries (#886)
* Define `Capture` type and expect it as an argument in `Dissect` method

* Add an indicator for the  eBPF sourced entries

* Fix the Go lint error

* Fix the logic in the UI

* Update the expected JSONs

* Add TODOs

* Add `UndefinedCapture` constant

* Define `CaptureTypes` enum
2022-03-17 09:32:09 +03:00
RamiBerm
237002ef29 Dependency injection for oas servicemap (#895)
* Update main.go, main.go, and 3 more files...

* WIP

* Update main.go, oas_controller.go, and 3 more files...

* Update main.go, oas_generator.go, and servicemap.go

* Update loader.go and resolver.go

* Update oas_generator.go

* Update oas_generator.go, specgen_test.go, and 3 more files...

* Update service_map_controller_test.go

* Update oas_controller_test.go
2022-03-16 17:21:50 +02:00
65 changed files with 836 additions and 666 deletions

View File

@@ -1,8 +1,7 @@
const columns = {podName : 1, namespace : 2, tapping : 3};
const greenStatusImageSrc = '/static/media/success.662997eb.svg';
function getDomPathInStatusBar(line, column) {
return `.expandedStatusBar > :nth-child(2) > > :nth-child(2) > :nth-child(${line}) > :nth-child(${column})`;
return `[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > :nth-child(${line}) > :nth-child(${column})`;
}
export function checkLine(line, expectedValues) {
@@ -12,14 +11,14 @@ export function checkLine(line, expectedValues) {
cy.get(getDomPathInStatusBar(line, columns.namespace)).invoke('text').then(namespaceValue => {
expect(namespaceValue).to.equal(expectedValues.namespace);
cy.get(getDomPathInStatusBar(line, columns.tapping)).children().should('have.attr', 'src', greenStatusImageSrc);
cy.get(getDomPathInStatusBar(line, columns.tapping)).children().should('have.attr', 'src').and("match", /success.*\.svg/);
});
});
}
export function findLineAndCheck(expectedValues) {
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) > > :nth-child(1)').then(pods => {
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) > > :nth-child(2)').then(namespaces => {
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > > :nth-child(1)').then(pods => {
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) > > :nth-child(2)').then(namespaces => {
// organizing namespaces array
const podObjectsArray = Object.values(pods ?? {});
const namespacesObjectsArray = Object.values(namespaces ?? {});

View File

@@ -45,7 +45,7 @@ export function leftTextCheck(entryNum, path, expectedText) {
export function leftOnHoverCheck(entryNum, path, filterName) {
cy.get(`#list #entry-${entryNum} ${path}`).trigger('mouseover');
cy.get(`#list #entry-${entryNum} .Queryable-Tooltip`).invoke('text').should('match', new RegExp(filterName));
cy.get(`#list #entry-${entryNum} [data-cy='QueryableTooltip']`).invoke('text').should('match', new RegExp(filterName));
}
export function rightTextCheck(path, expectedText) {
@@ -54,7 +54,7 @@ export function rightTextCheck(path, expectedText) {
export function rightOnHoverCheck(path, expectedText) {
cy.get(`#rightSideContainer ${path}`).trigger('mouseover');
cy.get(`#rightSideContainer .Queryable-Tooltip`).invoke('text').should('match', new RegExp(expectedText));
cy.get(`#rightSideContainer [data-cy='QueryableTooltip']`).invoke('text').should('match', new RegExp(expectedText));
}
export function checkThatAllEntriesShown() {

View File

@@ -8,6 +8,6 @@ it('check', function () {
cy.visit(`http://localhost:${port}`);
cy.wait('@statusTap').its('response.statusCode').should('match', /^2\d{2}/);
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="expandedStatusBar"]`).trigger('mouseover',{force: true});
findLineAndCheck(getExpectedDetailsDict(podName, namespace));
});

View File

@@ -2,7 +2,7 @@ import {findLineAndCheck, getExpectedDetailsDict} from '../testHelpers/StatusBar
it('opening', function () {
cy.visit(Cypress.env('testUrl'));
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
});
[1, 2, 3].map(doItFunc);

View File

@@ -3,9 +3,9 @@ import {getExpectedDetailsDict, checkLine} from '../testHelpers/StatusBarHelper'
it('opening', function () {
cy.visit(Cypress.env('testUrl'));
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
cy.get('.expandedStatusBar > :nth-child(2) > > :nth-child(2) >').should('have.length', 1); // one line
cy.get('[data-cy="expandedStatusBar"] > :nth-child(2) > > :nth-child(2) >').should('have.length', 1); // one line
checkLine(1, getExpectedDetailsDict(Cypress.env('name'), Cypress.env('namespace')));
});

View File

@@ -26,7 +26,7 @@ it('opening mizu', function () {
verifyMinimumEntries();
it('top bar check', function () {
cy.get('.podsCount').trigger('mouseover');
cy.get(`[data-cy="podsCountText"]`).trigger('mouseover');
podsArray.map(findLineAndCheck);
cy.reload();
});
@@ -205,6 +205,7 @@ function checkFilter(filterDetails){
// checks the hover on the last entry (the only one in DOM at the beginning)
leftOnHoverCheck(totalEntries - 1, leftSidePath, name);
cy.get('.w-tc-editor-text').clear();
// applying the filter with alt+enter or with the button
cy.get('.w-tc-editor-text').type(`${name}${applyByEnter ? '{alt+enter}' : ''}`);
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));

View File

@@ -16,6 +16,7 @@ import (
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/models"
@@ -55,6 +56,7 @@ const (
)
func main() {
initializeDependencies()
logLevel := determineLogLevel()
logger.InitLoggerStd(logLevel)
flag.Parse()
@@ -203,10 +205,12 @@ func runInHarReaderMode() {
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oas.GetOasGeneratorInstance().Start()
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.Start()
}
if config.Config.ServiceMap {
servicemap.GetInstance().Enable()
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
serviceMapGenerator.Enable()
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
@@ -385,3 +389,8 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
}
}
}
func initializeDependencies() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
}

View File

@@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/holder"
@@ -151,7 +152,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
entryWSource.Destination = mizuEntry.Destination.IP + ":" + mizuEntry.Destination.Port
}
oas.GetOasGeneratorInstance().PushEntry(&entryWSource)
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink)
oasGenerator.PushEntry(&entryWSource)
}
data, err := json.Marshal(mizuEntry)
@@ -163,7 +165,9 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
connection.SendText(string(data))
servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
elastic.GetInstance().PushEntry(mizuEntry)
}
}

View File

@@ -5,13 +5,15 @@ import (
"github.com/chanced/openapi"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/shared/logger"
)
func GetOASServers(c *gin.Context) {
m := make([]string, 0)
oas.GetOasGeneratorInstance().ServiceSpecs.Range(func(key, value interface{}) bool {
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.GetServiceSpecs().Range(func(key, value interface{}) bool {
m = append(m, key.(string))
return true
})
@@ -20,7 +22,8 @@ func GetOASServers(c *gin.Context) {
}
func GetOASSpec(c *gin.Context) {
res, ok := oas.GetOasGeneratorInstance().ServiceSpecs.Load(c.Param("id"))
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
res, ok := oasGenerator.GetServiceSpecs().Load(c.Param("id"))
if !ok {
c.JSON(http.StatusNotFound, gin.H{
"error": true,
@@ -48,7 +51,9 @@ func GetOASSpec(c *gin.Context) {
func GetOASAllSpecs(c *gin.Context) {
res := map[string]*openapi.OpenAPI{}
oas.GetOasGeneratorInstance().ServiceSpecs.Range(func(key, value interface{}) bool {
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.GetServiceSpecs().Range(func(key, value interface{}) bool {
svc := key.(string)
gen := value.(*oas.SpecGen)
spec, err := gen.GetSpec()

View File

@@ -4,36 +4,43 @@ import (
"net/http/httptest"
"testing"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/gin-gonic/gin"
)
func TestGetOASServers(t *testing.T) {
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
oas.GetOasGeneratorInstance().Start()
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
oas.GetDefaultOasGeneratorInstance().Start()
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
GetOASServers(c)
t.Logf("Written body: %s", recorder.Body.String())
}
func TestGetOASAllSpecs(t *testing.T) {
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
oas.GetOasGeneratorInstance().Start()
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
oas.GetDefaultOasGeneratorInstance().Start()
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
GetOASAllSpecs(c)
t.Logf("Written body: %s", recorder.Body.String())
}
func TestGetOASSpec(t *testing.T) {
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
oas.GetOasGeneratorInstance().Start()
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
oas.GetDefaultOasGeneratorInstance().Start()
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
c.Params = []gin.Param{{Key: "id", Value: "some"}}

View File

@@ -3,6 +3,7 @@ package controllers
import (
"net/http"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/gin-gonic/gin"
@@ -13,8 +14,9 @@ type ServiceMapController struct {
}
func NewServiceMapController() *ServiceMapController {
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
return &ServiceMapController{
service: servicemap.GetInstance(),
service: serviceMapGenerator,
}
}

View File

@@ -7,6 +7,7 @@ import (
"net/http/httptest"
"testing"
"github.com/up9inc/mizu/agent/pkg/dependency"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/gin-gonic/gin"
@@ -57,9 +58,11 @@ type ServiceMapControllerSuite struct {
}
func (s *ServiceMapControllerSuite) SetupTest() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
s.c = NewServiceMapController()
s.c.service.Enable()
s.c.service.NewTCPEntry(TCPEntryA, TCPEntryB, ProtocolHttp)
s.c.service.(servicemap.ServiceMapSink).NewTCPEntry(TCPEntryA, TCPEntryB, ProtocolHttp)
s.w = httptest.NewRecorder()
s.g, _ = gin.CreateTestContext(s.w)

View File

@@ -0,0 +1,11 @@
package dependency
var typeIntializerMap = make(map[DependencyContainerType]func() interface{}, 0)
func RegisterGenerator(name DependencyContainerType, fn func() interface{}) {
typeIntializerMap[name] = fn
}
func GetInstance(name DependencyContainerType) interface{} {
return typeIntializerMap[name]()
}

View File

@@ -0,0 +1,8 @@
package dependency
type DependencyContainerType string
const (
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
OasGeneratorDependency = "OasGeneratorDependency"
)

View File

@@ -147,9 +147,9 @@ func feedEntry(entry *har.Entry, source string, isSync bool, file string) {
ews := EntryWithSource{Entry: *entry, Source: source, Destination: u.Host, Id: uint(0)}
if isSync {
GetOasGeneratorInstance().entriesChan <- ews // blocking variant, right?
GetDefaultOasGeneratorInstance().entriesChan <- ews // blocking variant, right?
} else {
GetOasGeneratorInstance().PushEntry(&ews)
GetDefaultOasGeneratorInstance().PushEntry(&ews)
}
}

View File

@@ -12,18 +12,38 @@ import (
var (
syncOnce sync.Once
instance *oasGenerator
instance *defaultOasGenerator
)
func GetOasGeneratorInstance() *oasGenerator {
type OasGeneratorSink interface {
PushEntry(entryWithSource *EntryWithSource)
}
type OasGenerator interface {
Start()
Stop()
IsStarted() bool
Reset()
GetServiceSpecs() *sync.Map
}
type defaultOasGenerator struct {
started bool
ctx context.Context
cancel context.CancelFunc
serviceSpecs *sync.Map
entriesChan chan EntryWithSource
}
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
syncOnce.Do(func() {
instance = newOasGenerator()
instance = NewDefaultOasGenerator()
logger.Log.Debug("OAS Generator Initialized")
})
return instance
}
func (g *oasGenerator) Start() {
func (g *defaultOasGenerator) Start() {
if g.started {
return
}
@@ -31,12 +51,12 @@ func (g *oasGenerator) Start() {
g.cancel = cancel
g.ctx = ctx
g.entriesChan = make(chan EntryWithSource, 100) // buffer up to 100 entries for OAS processing
g.ServiceSpecs = &sync.Map{}
g.serviceSpecs = &sync.Map{}
g.started = true
go instance.runGenerator()
go g.runGenerator()
}
func (g *oasGenerator) Stop() {
func (g *defaultOasGenerator) Stop() {
if !g.started {
return
}
@@ -45,11 +65,11 @@ func (g *oasGenerator) Stop() {
g.started = false
}
func (g *oasGenerator) IsStarted() bool {
func (g *defaultOasGenerator) IsStarted() bool {
return g.started
}
func (g *oasGenerator) runGenerator() {
func (g *defaultOasGenerator) runGenerator() {
for {
select {
case <-g.ctx.Done():
@@ -67,11 +87,11 @@ func (g *oasGenerator) runGenerator() {
logger.Log.Errorf("Failed to parse entry URL: %v, err: %v", entry.Request.URL, err)
}
val, found := g.ServiceSpecs.Load(entryWithSource.Destination)
val, found := g.serviceSpecs.Load(entryWithSource.Destination)
var gen *SpecGen
if !found {
gen = NewGen(u.Scheme + "://" + entryWithSource.Destination)
g.ServiceSpecs.Store(entryWithSource.Destination, gen)
g.serviceSpecs.Store(entryWithSource.Destination, gen)
} else {
gen = val.(*SpecGen)
}
@@ -92,11 +112,11 @@ func (g *oasGenerator) runGenerator() {
}
}
func (g *oasGenerator) Reset() {
g.ServiceSpecs = &sync.Map{}
func (g *defaultOasGenerator) Reset() {
g.serviceSpecs = &sync.Map{}
}
func (g *oasGenerator) PushEntry(entryWithSource *EntryWithSource) {
func (g *defaultOasGenerator) PushEntry(entryWithSource *EntryWithSource) {
if !g.started {
return
}
@@ -107,12 +127,16 @@ func (g *oasGenerator) PushEntry(entryWithSource *EntryWithSource) {
}
}
func newOasGenerator() *oasGenerator {
return &oasGenerator{
func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map {
return g.serviceSpecs
}
func NewDefaultOasGenerator() *defaultOasGenerator {
return &defaultOasGenerator{
started: false,
ctx: nil,
cancel: nil,
ServiceSpecs: nil,
serviceSpecs: nil,
entriesChan: nil,
}
}
@@ -123,11 +147,3 @@ type EntryWithSource struct {
Entry har.Entry
Id uint
}
type oasGenerator struct {
started bool
ctx context.Context
cancel context.CancelFunc
ServiceSpecs *sync.Map
entriesChan chan EntryWithSource
}

View File

@@ -2,10 +2,6 @@ package oas
import (
"encoding/json"
"github.com/chanced/openapi"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"github.com/wI2L/jsondiff"
"io/ioutil"
"os"
"regexp"
@@ -13,6 +9,11 @@ import (
"testing"
"time"
"github.com/chanced/openapi"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"github.com/wI2L/jsondiff"
"github.com/up9inc/mizu/agent/pkg/har"
)
@@ -47,14 +48,14 @@ func TestEntries(t *testing.T) {
t.Log(err)
t.FailNow()
}
GetOasGeneratorInstance().Start()
GetDefaultOasGeneratorInstance().Start()
loadStartingOAS("test_artifacts/catalogue.json", "catalogue")
loadStartingOAS("test_artifacts/trcc.json", "trcc-api-service")
go func() {
for {
time.Sleep(1 * time.Second)
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
svc := key.(string)
t.Logf("Getting spec for %s", svc)
gen := val.(*SpecGen)
@@ -76,7 +77,7 @@ func TestEntries(t *testing.T) {
waitQueueProcessed()
svcs := strings.Builder{}
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
gen := val.(*SpecGen)
svc := key.(string)
svcs.WriteString(svc + ",")
@@ -98,7 +99,7 @@ func TestEntries(t *testing.T) {
return true
})
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
svc := key.(string)
gen := val.(*SpecGen)
spec, err := gen.GetSpec()
@@ -122,8 +123,8 @@ func TestEntries(t *testing.T) {
}
func TestFileSingle(t *testing.T) {
GetOasGeneratorInstance().Start()
GetOasGeneratorInstance().Reset()
GetDefaultOasGeneratorInstance().Start()
GetDefaultOasGeneratorInstance().Reset()
// loadStartingOAS()
file := "test_artifacts/params.har"
files := []string{file}
@@ -135,7 +136,7 @@ func TestFileSingle(t *testing.T) {
waitQueueProcessed()
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
svc := key.(string)
gen := val.(*SpecGen)
spec, err := gen.GetSpec()
@@ -191,7 +192,7 @@ func TestFileSingle(t *testing.T) {
func waitQueueProcessed() {
for {
time.Sleep(100 * time.Millisecond)
queue := len(GetOasGeneratorInstance().entriesChan)
queue := len(GetDefaultOasGeneratorInstance().entriesChan)
logger.Log.Infof("Queue: %d", queue)
if queue < 1 {
break
@@ -221,7 +222,7 @@ func loadStartingOAS(file string, label string) {
gen := NewGen(label)
gen.StartFromSpec(doc)
GetOasGeneratorInstance().ServiceSpecs.Store(label, gen)
GetDefaultOasGeneratorInstance().GetServiceSpecs().Store(label, gen)
}
func TestEntriesNegative(t *testing.T) {

View File

@@ -1,9 +1,10 @@
package oas
import (
"github.com/chanced/openapi"
"strings"
"testing"
"github.com/chanced/openapi"
)
func TestTree(t *testing.T) {

View File

@@ -10,7 +10,7 @@ import (
restclient "k8s.io/client-go/rest"
)
func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
func NewFromInCluster(errOut chan error, namespace string) (*Resolver, error) {
config, err := restclient.InClusterConfig()
if err != nil {
return nil, err
@@ -19,5 +19,5 @@ func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
if err != nil {
return nil, err
}
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namesapce}, nil
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namespace}, nil
}

View File

@@ -168,11 +168,13 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(resolved)
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
resolver.nameMap.Set(resolved, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}

View File

@@ -13,28 +13,31 @@ const (
UnresolvedNodeName = "unresolved"
)
var instance *serviceMap
var instance *defaultServiceMap
var once sync.Once
func GetInstance() ServiceMap {
func GetDefaultServiceMapInstance() *defaultServiceMap {
once.Do(func() {
instance = newServiceMap()
instance = NewDefaultServiceMapGenerator()
logger.Log.Debug("Service Map Initialized")
})
return instance
}
type serviceMap struct {
type defaultServiceMap struct {
enabled bool
graph *graph
entriesProcessed int
}
type ServiceMapSink interface {
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
}
type ServiceMap interface {
Enable()
Disable()
IsEnabled() bool
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
GetStatus() ServiceMapStatus
GetNodes() []ServiceMapNode
GetEdges() []ServiceMapEdge
@@ -44,8 +47,8 @@ type ServiceMap interface {
Reset()
}
func newServiceMap() *serviceMap {
return &serviceMap{
func NewDefaultServiceMapGenerator() *defaultServiceMap {
return &defaultServiceMap{
enabled: false,
entriesProcessed: 0,
graph: newDirectedGraph(),
@@ -105,12 +108,12 @@ func newEdgeData(p *tapApi.Protocol) *edgeData {
}
}
func (s *serviceMap) nodeExists(k key) (*nodeData, bool) {
func (s *defaultServiceMap) nodeExists(k key) (*nodeData, bool) {
n, ok := s.graph.Nodes[k]
return n, ok
}
func (s *serviceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
func (s *defaultServiceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
nd, exists := s.nodeExists(k)
if !exists {
s.graph.Nodes[k] = newNodeData(len(s.graph.Nodes)+1, e)
@@ -119,7 +122,7 @@ func (s *serviceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
return nd, false
}
func (s *serviceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
func (s *defaultServiceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
if n, ok := s.addNode(u.key, u.entry); !ok {
n.count++
}
@@ -156,20 +159,20 @@ func (s *serviceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
s.entriesProcessed++
}
func (s *serviceMap) Enable() {
func (s *defaultServiceMap) Enable() {
s.enabled = true
}
func (s *serviceMap) Disable() {
func (s *defaultServiceMap) Disable() {
s.Reset()
s.enabled = false
}
func (s *serviceMap) IsEnabled() bool {
func (s *defaultServiceMap) IsEnabled() bool {
return s.enabled
}
func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Protocol) {
func (s *defaultServiceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Protocol) {
if !s.IsEnabled() {
return
}
@@ -206,7 +209,7 @@ func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Pro
s.addEdge(srcEntry, dstEntry, p)
}
func (s *serviceMap) GetStatus() ServiceMapStatus {
func (s *defaultServiceMap) GetStatus() ServiceMapStatus {
status := ServiceMapDisabled
if s.IsEnabled() {
status = ServiceMapEnabled
@@ -220,7 +223,7 @@ func (s *serviceMap) GetStatus() ServiceMapStatus {
}
}
func (s *serviceMap) GetNodes() []ServiceMapNode {
func (s *defaultServiceMap) GetNodes() []ServiceMapNode {
var nodes []ServiceMapNode
for i, n := range s.graph.Nodes {
nodes = append(nodes, ServiceMapNode{
@@ -233,7 +236,7 @@ func (s *serviceMap) GetNodes() []ServiceMapNode {
return nodes
}
func (s *serviceMap) GetEdges() []ServiceMapEdge {
func (s *defaultServiceMap) GetEdges() []ServiceMapEdge {
var edges []ServiceMapEdge
for u, m := range s.graph.Edges {
for v := range m {
@@ -260,15 +263,15 @@ func (s *serviceMap) GetEdges() []ServiceMapEdge {
return edges
}
func (s *serviceMap) GetEntriesProcessedCount() int {
func (s *defaultServiceMap) GetEntriesProcessedCount() int {
return s.entriesProcessed
}
func (s *serviceMap) GetNodesCount() int {
func (s *defaultServiceMap) GetNodesCount() int {
return len(s.graph.Nodes)
}
func (s *serviceMap) GetEdgesCount() int {
func (s *defaultServiceMap) GetEdgesCount() int {
var count int
for u, m := range s.graph.Edges {
for v := range m {
@@ -280,7 +283,7 @@ func (s *serviceMap) GetEdgesCount() int {
return count
}
func (s *serviceMap) Reset() {
func (s *defaultServiceMap) Reset() {
s.entriesProcessed = 0
s.graph = newDirectedGraph()
}

View File

@@ -80,21 +80,21 @@ var (
type ServiceMapDisabledSuite struct {
suite.Suite
instance ServiceMap
instance *defaultServiceMap
}
type ServiceMapEnabledSuite struct {
suite.Suite
instance ServiceMap
instance *defaultServiceMap
}
func (s *ServiceMapDisabledSuite) SetupTest() {
s.instance = GetInstance()
s.instance = GetDefaultServiceMapInstance()
}
func (s *ServiceMapEnabledSuite) SetupTest() {
s.instance = GetInstance()
s.instance = GetDefaultServiceMapInstance()
s.instance.Enable()
}
@@ -107,7 +107,7 @@ func (s *ServiceMapDisabledSuite) TestServiceMapInstance() {
func (s *ServiceMapDisabledSuite) TestServiceMapSingletonInstance() {
assert := s.Assert()
instance2 := GetInstance()
instance2 := GetDefaultServiceMapInstance()
assert.NotNil(s.instance)
assert.NotNil(instance2)

View File

@@ -27,4 +27,5 @@ func init() {
}
checkCmd.Flags().Bool(configStructs.PreTapCheckName, defaultCheckConfig.PreTap, "Check pre-tap Mizu installation for potential problems")
checkCmd.Flags().Bool(configStructs.ImagePullCheckName, defaultCheckConfig.ImagePull, "Test connectivity to container image registry by creating and removing a temporary pod in 'default' namespace")
}

View File

@@ -0,0 +1,102 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"time"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nimage-pull-in-cluster\n--------------------")
namespace := "default"
podName := "mizu-test"
defer func() {
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
logger.Log.Errorf("%v error while removing test pod in cluster, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
}
}()
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
logger.Log.Errorf("%v error while creating test pod in cluster, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
logger.Log.Errorf("%v cluster is not able to pull mizu containers from docker hub, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v cluster is able to pull mizu containers from docker hub", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: "up9inc/busybox",
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,31 @@
package check
import (
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
func KubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------")
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath(), config.Config.KubeContext)
if err != nil {
logger.Log.Errorf("%v can't initialize the client, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can initialize the client", fmt.Sprintf(uiUtils.Green, "√"))
kubernetesVersion, err := kubernetesProvider.GetKubernetesVersion()
if err != nil {
logger.Log.Errorf("%v can't query the Kubernetes API, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can query the Kubernetes API", fmt.Sprintf(uiUtils.Green, "√"))
return kubernetesProvider, kubernetesVersion, true
}

View File

@@ -0,0 +1,87 @@
package check
import (
"context"
"embed"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
rbac "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
)
func TapKubernetesPermissions(ctx context.Context, embedFS embed.FS, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nkubernetes-permissions\n--------------------")
var filePath string
if config.Config.IsNsRestrictedMode() {
filePath = "permissionFiles/permissions-ns-tap.yaml"
} else {
filePath = "permissionFiles/permissions-all-namespaces-tap.yaml"
}
data, err := embedFS.ReadFile(filePath)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
obj, err := getDecodedObject(data)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
var rules []rbac.PolicyRule
if config.Config.IsNsRestrictedMode() {
rules = obj.(*rbac.Role).Rules
} else {
rules = obj.(*rbac.ClusterRole).Rules
}
return checkPermissions(ctx, kubernetesProvider, rules)
}
func getDecodedObject(data []byte) (runtime.Object, error) {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil {
return nil, err
}
return obj, nil
}
func checkPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule) bool {
permissionsExist := true
for _, rule := range rules {
for _, group := range rule.APIGroups {
for _, resource := range rule.Resources {
for _, verb := range rule.Verbs {
exist, err := kubernetesProvider.CanI(ctx, config.Config.MizuResourcesNamespace, resource, verb, group)
permissionsExist = checkPermissionExist(group, resource, verb, exist, err) && permissionsExist
}
}
}
}
return permissionsExist
}
func checkPermissionExist(group string, resource string, verb string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking permission for %v %v in group '%v', err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group, err)
return false
} else if !exist {
logger.Log.Errorf("%v can't %v %v in group '%v'", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group)
return false
}
logger.Log.Infof("%v can %v %v in group '%v'", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, group)
return true
}

View File

@@ -0,0 +1,95 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
)
func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nk8s-components\n--------------------")
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist
if config.Config.IsNsRestrictedMode() {
exist, err = kubernetesProvider.DoesRoleExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName)
allResourcesExist = checkResourceExist(kubernetes.RoleName, "role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesRoleBindingExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.RoleBindingName, "role binding", exist, err) && allResourcesExist
} else {
exist, err = kubernetesProvider.DoesClusterRoleExist(ctx, kubernetes.ClusterRoleName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleName, "cluster role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesClusterRoleBindingExist(ctx, kubernetes.ClusterRoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleBindingName, "cluster role binding", exist, err) && allResourcesExist
}
exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
allResourcesExist = checkResourceExist(kubernetes.ApiServerPodName, "service", exist, err) && allResourcesExist
allResourcesExist = checkPodResourcesExist(ctx, kubernetesProvider) && allResourcesExist
return allResourcesExist
}
func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
return false
} else if len(pods) == 0 {
logger.Log.Errorf("%v '%v' pod doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
} else if !kubernetes.IsPodRunning(&pods[0]) {
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
}
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pods are running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
return false
} else {
tappers := 0
notRunningTappers := 0
for _, pod := range pods {
tappers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
}
}
if notRunningTappers > 0 {
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
return false
}
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
return true
}
}
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking if '%v' %v exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType, err)
return false
} else if !exist {
logger.Log.Errorf("%v '%v' %v doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType)
return false
}
logger.Log.Infof("%v '%v' %v exists", fmt.Sprintf(uiUtils.Green, "√"), resourceName, resourceType)
return true
}

View File

@@ -0,0 +1,21 @@
package check
import (
"fmt"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
func KubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
logger.Log.Infof("\nkubernetes-version\n--------------------")
if err := kubernetes.ValidateKubernetesVersion(kubernetesVersion); err != nil {
logger.Log.Errorf("%v not running the minimum Kubernetes API version, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v is running the minimum Kubernetes API version", fmt.Sprintf(uiUtils.Green, "√"))
return true
}

View File

@@ -0,0 +1,83 @@
package check
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"regexp"
)
func ServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
serverUrl := fmt.Sprintf("http://%s", kubernetes.GetMizuApiServerProxiedHostAndPath(config.Config.Tap.GuiPort))
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err == nil {
logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
connectedToApiServer := false
if err := checkProxy(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√"))
}
if err := checkPortForward(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√"))
}
return connectedToApiServer
}
func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err)
}
return nil
}
func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
forwarder.Close()
return nil
}

View File

@@ -4,20 +4,10 @@ import (
"context"
"embed"
"fmt"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"regexp"
"time"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/check"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/semver"
)
var (
@@ -31,27 +21,29 @@ func runMizuCheck() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
kubernetesProvider, kubernetesVersion, checkPassed := checkKubernetesApi()
kubernetesProvider, kubernetesVersion, checkPassed := check.KubernetesApi()
if checkPassed {
checkPassed = checkKubernetesVersion(kubernetesVersion)
checkPassed = check.KubernetesVersion(kubernetesVersion)
}
if config.Config.Check.PreTap {
if checkPassed {
checkPassed = checkK8sTapPermissions(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = checkImagePullInCluster(ctx, kubernetesProvider)
checkPassed = check.TapKubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
} else {
if checkPassed {
checkPassed = checkK8sResources(ctx, kubernetesProvider)
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = checkServerConnection(kubernetesProvider)
checkPassed = check.ServerConnection(kubernetesProvider)
}
}
if config.Config.Check.ImagePull {
if checkPassed {
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
}
}
@@ -61,365 +53,3 @@ func runMizuCheck() {
logger.Log.Errorf("\nStatus check results are %v", fmt.Sprintf(uiUtils.Red, "✗"))
}
}
func checkKubernetesApi() (*kubernetes.Provider, *semver.SemVersion, bool) {
logger.Log.Infof("\nkubernetes-api\n--------------------")
kubernetesProvider, err := kubernetes.NewProvider(config.Config.KubeConfigPath(), config.Config.KubeContext)
if err != nil {
logger.Log.Errorf("%v can't initialize the client, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can initialize the client", fmt.Sprintf(uiUtils.Green, "√"))
kubernetesVersion, err := kubernetesProvider.GetKubernetesVersion()
if err != nil {
logger.Log.Errorf("%v can't query the Kubernetes API, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return nil, nil, false
}
logger.Log.Infof("%v can query the Kubernetes API", fmt.Sprintf(uiUtils.Green, "√"))
return kubernetesProvider, kubernetesVersion, true
}
func checkKubernetesVersion(kubernetesVersion *semver.SemVersion) bool {
logger.Log.Infof("\nkubernetes-version\n--------------------")
if err := kubernetes.ValidateKubernetesVersion(kubernetesVersion); err != nil {
logger.Log.Errorf("%v not running the minimum Kubernetes API version, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v is running the minimum Kubernetes API version", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkServerConnection(kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nAPI-server-connectivity\n--------------------")
serverUrl := GetApiServerUrl(config.Config.Tap.GuiPort)
apiServerProvider := apiserver.NewProvider(serverUrl, 1, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err == nil {
logger.Log.Infof("%v found Mizu server tunnel available and connected successfully to API server", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
connectedToApiServer := false
if err := checkProxy(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using proxy, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using proxy", fmt.Sprintf(uiUtils.Green, "√"))
}
if err := checkPortForward(serverUrl, kubernetesProvider); err != nil {
logger.Log.Errorf("%v couldn't connect to API server using port-forward, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
} else {
connectedToApiServer = true
logger.Log.Infof("%v connected successfully to API server using port-forward", fmt.Sprintf(uiUtils.Green, "√"))
}
return connectedToApiServer
}
func checkProxy(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.ProxyHost, config.Config.Tap.GuiPort, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy, err: %v", err)
}
return nil
}
func checkPortForward(serverUrl string, kubernetesProvider *kubernetes.Provider) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podRegex, _ := regexp.Compile(kubernetes.ApiServerPodName)
forwarder, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.MizuResourcesNamespace, podRegex, config.Config.Tap.GuiPort, ctx, cancel)
if err != nil {
return err
}
apiServerProvider := apiserver.NewProvider(serverUrl, apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiServerProvider.TestConnection(); err != nil {
return err
}
forwarder.Close()
return nil
}
func checkK8sResources(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nk8s-components\n--------------------")
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.MizuResourcesNamespace)
allResourcesExist := checkResourceExist(config.Config.MizuResourcesNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist
if config.Config.IsNsRestrictedMode() {
exist, err = kubernetesProvider.DoesRoleExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName)
allResourcesExist = checkResourceExist(kubernetes.RoleName, "role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesRoleBindingExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.RoleBindingName, "role binding", exist, err) && allResourcesExist
} else {
exist, err = kubernetesProvider.DoesClusterRoleExist(ctx, kubernetes.ClusterRoleName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleName, "cluster role", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesClusterRoleBindingExist(ctx, kubernetes.ClusterRoleBindingName)
allResourcesExist = checkResourceExist(kubernetes.ClusterRoleBindingName, "cluster role binding", exist, err) && allResourcesExist
}
exist, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName)
allResourcesExist = checkResourceExist(kubernetes.ApiServerPodName, "service", exist, err) && allResourcesExist
allResourcesExist = checkPodResourcesExist(ctx, kubernetesProvider) && allResourcesExist
return allResourcesExist
}
func checkPodResourcesExist(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pod is running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName, err)
return false
} else if len(pods) == 0 {
logger.Log.Errorf("%v '%v' pod doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
} else if !kubernetes.IsPodRunning(&pods[0]) {
logger.Log.Errorf("%v '%v' pod not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.ApiServerPodName)
return false
}
logger.Log.Infof("%v '%v' pod running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.ApiServerPodName)
if pods, err := kubernetesProvider.ListPodsByAppLabel(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperPodName); err != nil {
logger.Log.Errorf("%v error checking if '%v' pods are running, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, err)
return false
} else {
tappers := 0
notRunningTappers := 0
for _, pod := range pods {
tappers += 1
if !kubernetes.IsPodRunning(&pod) {
notRunningTappers += 1
}
}
if notRunningTappers > 0 {
logger.Log.Errorf("%v '%v' %v/%v pods are not running", fmt.Sprintf(uiUtils.Red, "✗"), kubernetes.TapperPodName, notRunningTappers, tappers)
return false
}
logger.Log.Infof("%v '%v' %v pods running", fmt.Sprintf(uiUtils.Green, "√"), kubernetes.TapperPodName, tappers)
return true
}
}
func checkResourceExist(resourceName string, resourceType string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking if '%v' %v exists, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType, err)
return false
} else if !exist {
logger.Log.Errorf("%v '%v' %v doesn't exist", fmt.Sprintf(uiUtils.Red, "✗"), resourceName, resourceType)
return false
}
logger.Log.Infof("%v '%v' %v exists", fmt.Sprintf(uiUtils.Green, "√"), resourceName, resourceType)
return true
}
func checkK8sTapPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nkubernetes-permissions\n--------------------")
var filePath string
if config.Config.IsNsRestrictedMode() {
filePath = "permissionFiles/permissions-ns-tap.yaml"
} else {
filePath = "permissionFiles/permissions-all-namespaces-tap.yaml"
}
data, err := embedFS.ReadFile(filePath)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
obj, err := getDecodedObject(data)
if err != nil {
logger.Log.Errorf("%v error while checking kubernetes permissions, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
var rules []rbac.PolicyRule
if config.Config.IsNsRestrictedMode() {
rules = obj.(*rbac.Role).Rules
} else {
rules = obj.(*rbac.ClusterRole).Rules
}
return checkPermissions(ctx, kubernetesProvider, rules)
}
func getDecodedObject(data []byte) (runtime.Object, error) {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(data, nil, nil)
if err != nil {
return nil, err
}
return obj, nil
}
func checkPermissions(ctx context.Context, kubernetesProvider *kubernetes.Provider, rules []rbac.PolicyRule) bool {
permissionsExist := true
for _, rule := range rules {
for _, group := range rule.APIGroups {
for _, resource := range rule.Resources {
for _, verb := range rule.Verbs {
exist, err := kubernetesProvider.CanI(ctx, config.Config.MizuResourcesNamespace, resource, verb, group)
permissionsExist = checkPermissionExist(group, resource, verb, exist, err) && permissionsExist
}
}
}
}
return permissionsExist
}
func checkPermissionExist(group string, resource string, verb string, exist bool, err error) bool {
if err != nil {
logger.Log.Errorf("%v error checking permission for %v %v in group '%v', err: %v", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group, err)
return false
} else if !exist {
logger.Log.Errorf("%v can't %v %v in group '%v'", fmt.Sprintf(uiUtils.Red, "✗"), verb, resource, group)
return false
}
logger.Log.Infof("%v can %v %v in group '%v'", fmt.Sprintf(uiUtils.Green, "√"), verb, resource, group)
return true
}
func checkImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
logger.Log.Infof("\nimage-pull-in-cluster\n--------------------")
podName := "image-pull-in-cluster"
defer removeImagePullInClusterResources(ctx, kubernetesProvider, podName)
if err := createImagePullInClusterResources(ctx, kubernetesProvider, podName); err != nil {
logger.Log.Errorf("%v error while creating image pull in cluster resources, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, podName); err != nil {
logger.Log.Errorf("%v cluster is not able to pull mizu containers from docker hub, err: %v", fmt.Sprintf(uiUtils.Red, "✗"), err)
return false
}
logger.Log.Infof("%v cluster is able to pull mizu containers from docker hub", fmt.Sprintf(uiUtils.Green, "√"))
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func removeImagePullInClusterResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) {
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, podName); err != nil {
logger.Log.Debugf("error while removing image pull in cluster resources, err: %v", err)
}
if !config.Config.IsNsRestrictedMode() {
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
logger.Log.Debugf("error while removing image pull in cluster resources, err: %v", err)
}
}
}
func createImagePullInClusterResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, podName string) error {
if !config.Config.IsNsRestrictedMode() {
if _, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
return err
}
}
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: "up9inc/busybox",
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, config.Config.MizuResourcesNamespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -1,9 +1,11 @@
package configStructs
const (
PreTapCheckName = "pre-tap"
PreTapCheckName = "pre-tap"
ImagePullCheckName = "image-pull"
)
type CheckConfig struct {
PreTap bool `yaml:"pre-tap"`
PreTap bool `yaml:"pre-tap"`
ImagePull bool `yaml:"image-pull"`
}

View File

@@ -94,10 +94,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperPods() {
continue
}
if tapperSyncer.startTime.After(pod.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf("Watching tapper pods loop, tapper: %v, node: %v, status: %v", pod.Name, pod.Spec.NodeName, pod.Status.Phase)
if pod.Spec.NodeName != "" {
tapperStatus := shared.TapperStatus{TapperName: pod.Name, NodeName: pod.Spec.NodeName, Status: string(pod.Status.Phase)}
@@ -137,10 +133,6 @@ func (tapperSyncer *MizuTapperSyncer) watchTapperEvents() {
continue
}
if tapperSyncer.startTime.After(event.CreationTimestamp.Time) {
continue
}
logger.Log.Debugf(
fmt.Sprintf("Watching tapper events loop, event %s, time: %v, resource: %s (%s), reason: %s, note: %s",
event.Name,

View File

@@ -48,6 +48,16 @@ type Extension struct {
Dissector Dissector
}
type Capture string
const (
UndefinedCapture Capture = ""
Pcap Capture = "pcap"
Envoy Capture = "envoy"
Linkerd Capture = "linkerd"
Ebpf Capture = "ebpf"
)
type ConnectionInfo struct {
ClientIP string
ClientPort string
@@ -84,6 +94,7 @@ type RequestResponsePair struct {
// `Protocol` is modified in the later stages of data propagation. Therefore it's not a pointer.
type OutputChannelItem struct {
Protocol Protocol
Capture Capture
Timestamp int64
ConnectionInfo *ConnectionInfo
Pair *RequestResponsePair
@@ -102,7 +113,7 @@ type SuperIdentifier struct {
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Dissect(b *bufio.Reader, capture Capture, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
@@ -132,6 +143,7 @@ func (e *Emitting) Emit(item *OutputChannelItem) {
type Entry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto"`
Capture Capture `json:"capture"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
@@ -162,6 +174,7 @@ type EntryWrapper struct {
type BaseEntry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto,omitempty"`
Capture Capture `json:"capture"`
Summary string `json:"summary,omitempty"`
SummaryQuery string `json:"summaryQuery,omitempty"`
Status int `json:"status"`

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/amqp/\* expect

View File

@@ -94,7 +94,7 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter, capture api.Capture) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: captureTime,
@@ -108,6 +108,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Capture: capture,
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@@ -39,7 +39,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
r := AmqpReader{b}
var remaining int
@@ -113,11 +113,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicPublish:
eventBasicPublish.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
case *MethodFrame:
@@ -138,7 +138,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -151,7 +151,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -171,7 +171,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -185,7 +185,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Arguments: m.Arguments,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -196,7 +196,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Locales: m.Locales,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter, capture)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -206,7 +206,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
MethodId: m.MethodId,
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter, capture)
}
default:
@@ -222,6 +222,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["method"] = request["method"]
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -283,6 +284,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: 0,

View File

@@ -122,7 +122,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -140,7 +140,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/http/\* expect

View File

@@ -47,7 +47,7 @@ func replaceForwardedFor(item *api.OutputChannelItem) {
item.ConnectionInfo.ClientPort = ""
}
func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
func handleHTTP2Stream(http2Assembler *Http2Assembler, capture api.Capture, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) error {
streamID, messageHTTP1, isGrpc, err := http2Assembler.readMessage()
if err != nil {
return err
@@ -104,13 +104,14 @@ func handleHTTP2Stream(http2Assembler *Http2Assembler, tcpID *api.TcpID, superTi
} else {
item.Protocol = http2Protocol
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
func handleHTTP1ClientStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, req *http.Request, err error) {
req, err = http.ReadRequest(b)
if err != nil {
return
@@ -147,12 +148,13 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
func handleHTTP1ServerStream(b *bufio.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher *requestResponseMatcher) (switchingProtocolsHTTP2 bool, err error) {
var res *http.Response
res, err = http.ReadResponse(b, nil)
if err != nil {
@@ -190,6 +192,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
ServerPort: tcpID.SrcPort,
IsOutgoing: false,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
return

View File

@@ -86,7 +86,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", http11protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
var err error
@@ -121,7 +121,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isHTTP2 {
err = handleHTTP2Stream(http2Assembler, tcpID, superTimer, emitter, options, reqResMatcher)
err = handleHTTP2Stream(http2Assembler, capture, tcpID, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -130,7 +130,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
superIdentifier.Protocol = &http11protocol
} else if isClient {
var req *http.Request
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, req, err = handleHTTP1ClientStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -157,11 +157,12 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ServerPort: tcpID.DstPort,
IsOutgoing: true,
}
item.Capture = capture
filterAndEmit(item, emitter, options)
}
}
} else {
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
switchingProtocolsHTTP2, err = handleHTTP1ServerStream(b, capture, tcpID, counterPair, superTimer, emitter, options, reqResMatcher)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
@@ -259,6 +260,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
httpPair, _ := json.Marshal(item.Pair)
return &api.Entry{
Protocol: item.Protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -291,6 +293,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -124,7 +124,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}
@@ -142,7 +142,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/kafka/\* expect

View File

@@ -35,7 +35,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
@@ -49,7 +49,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, counterPair, superTimer, emitter, reqResMatcher)
err := ReadResponse(b, capture, tcpID, counterPair, superTimer, emitter, reqResMatcher)
if err != nil {
return err
}
@@ -68,6 +68,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: _protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -190,6 +191,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
}
reqResMatcher := dissector.NewResponseRequestMatcher()
reqResMatcher.SetMaxTry(10)
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@@ -16,7 +16,7 @@ type Response struct {
CaptureTime time.Time `json:"captureTime"`
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
func ReadResponse(r io.Reader, capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, reqResMatcher *requestResponseMatcher) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -258,6 +258,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, s
item := &api.OutputChannelItem{
Protocol: _protocol,
Capture: capture,
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect4/redis/\* expect

View File

@@ -6,7 +6,7 @@ import (
"github.com/up9inc/mizu/tap/api"
)
func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleClientStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, request *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Request++
requestCounter := counterPair.Request
@@ -23,6 +23,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerRequest(ident, request, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
ClientPort: tcpID.SrcPort,
@@ -35,7 +36,7 @@ func handleClientStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
return nil
}
func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
func handleServerStream(capture api.Capture, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter, response *RedisPacket, reqResMatcher *requestResponseMatcher) error {
counterPair.Lock()
counterPair.Response++
responseCounter := counterPair.Response
@@ -52,6 +53,7 @@ func handleServerStream(tcpID *api.TcpID, counterPair *api.CounterPair, superTim
item := reqResMatcher.registerResponse(ident, response, superTimer.CaptureTime)
if item != nil {
item.Capture = capture
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
ClientPort: tcpID.DstPort,

View File

@@ -34,7 +34,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
func (d dissecting) Dissect(b *bufio.Reader, capture api.Capture, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions, _reqResMatcher api.RequestResponseMatcher) error {
reqResMatcher := _reqResMatcher.(*requestResponseMatcher)
is := &RedisInputStream{
Reader: b,
@@ -48,9 +48,9 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleClientStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
} else {
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
err = handleServerStream(capture, tcpID, counterPair, superTimer, emitter, redisPacket, reqResMatcher)
}
if err != nil {
@@ -71,6 +71,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
}
return &api.Entry{
Protocol: protocol,
Capture: item.Capture,
Source: &api.TCP{
Name: resolvedSource,
IP: item.ConnectionInfo.ClientIP,
@@ -113,6 +114,7 @@ func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Capture: entry.Capture,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,

View File

@@ -123,7 +123,7 @@ func TestDissect(t *testing.T) {
DstPort: "2",
}
reqResMatcher := dissector.NewResponseRequestMatcher()
err = dissector.Dissect(bufferClient, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferClient, api.Pcap, true, tcpIDClient, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}
@@ -141,7 +141,7 @@ func TestDissect(t *testing.T) {
SrcPort: "2",
DstPort: "1",
}
err = dissector.Dissect(bufferServer, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
err = dissector.Dissect(bufferServer, api.Pcap, false, tcpIDServer, counterPair, &api.SuperTimer{}, superIdentifier, emitter, options, reqResMatcher)
if err != nil && reflect.TypeOf(err) != reflect.TypeOf(&ConnectError{}) && err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println(err)
}

View File

@@ -95,7 +95,8 @@ func (h *tcpReader) Close() {
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
// TODO: Add api.Pcap, api.Envoy and api.Linkerd distinction by refactoring NewPacketSourceManager method
err := h.extension.Dissector.Dissect(b, api.Pcap, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions, h.reqResMatcher)
if err != nil {
_, err = io.Copy(ioutil.Discard, b)
if err != nil {

View File

@@ -158,7 +158,7 @@ func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
b := bufio.NewReader(reader)
err := extension.Dissector.Dissect(b, isRequest, tcpid, &api.CounterPair{},
err := extension.Dissector.Dissect(b, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
if err != nil {

View File

@@ -5,10 +5,10 @@ import Api, {getWebsocketUrl} from "./api";
const api = Api.getInstance()
const App = () => {
const {message,error,isOpen, openSocket, closeSocket, sendQuery} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQuery: sendQuery}}
sendQuery(DEFAULT_QUERY);
const App = () => {
const {message,error,isOpen, openSocket, closeSocket, sendQueryWhenWsOpen} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQueryWhenWsOpen: sendQueryWhenWsOpen}}
sendQueryWhenWsOpen(DEFAULT_QUERY);
useEffect(() => {
return () =>{

View File

@@ -1,6 +1,6 @@
{
"name": "@up9/mizu-common",
"version": "1.0.125",
"version": "1.0.128",
"description": "Made with create-react-library",
"author": "",
"license": "MIT",
@@ -30,7 +30,6 @@
"react":"^17.0.2",
"react-dom": "^17.0.2",
"recoil": "^0.5.2",
"react-copy-to-clipboard": "^5.0.3",
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10"

View File

@@ -62,6 +62,13 @@
width: 185px
text-align: left
.capture
margin-top: -60px
.capture img
height: 20px
z-index: 1000
.endpointServiceContainer
display: flex
flex-direction: column

View File

@@ -4,6 +4,7 @@ import SwapHorizIcon from '@material-ui/icons/SwapHoriz';
import styles from './EntryListItem.module.sass';
import StatusCode, {getClassification, StatusCodeClassification} from "../../UI/StatusCode";
import Protocol, {ProtocolInterface} from "../../UI/Protocol"
import eBPFLogo from '../assets/ebpf.png';
import {Summary} from "../../UI/Summary";
import Queryable from "../../UI/Queryable";
import ingoingIconSuccess from "assets/ingoing-traffic-success.svg"
@@ -24,6 +25,7 @@ interface TCPInterface {
interface Entry {
proto: ProtocolInterface,
capture: string,
method?: string,
methodQuery?: string,
summary: string,
@@ -52,6 +54,14 @@ interface EntryProps {
headingMode: boolean;
}
enum CaptureTypes {
UndefinedCapture = "",
Pcap = "pcap",
Envoy = "envoy",
Linkerd = "linkerd",
Ebpf = "ebpf",
}
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) => {
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
@@ -154,6 +164,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
protocol={entry.proto}
horizontal={false}
/> : null}
{/* TODO: Update the code below once we have api.Pcap, api.Envoy and api.Linkerd distinction in the backend */}
{entry.capture === CaptureTypes.Ebpf ? <div className={styles.capture}>
<Queryable
query={`capture == "${entry.capture}"`}
displayIconOnMouseOver={true}
flipped={false}
style={{position: "absolute"}}
>
<img src={eBPFLogo} alt="eBPF"/>
</Queryable>
</div> : null}
{isStatusCodeEnabled && <div>
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
</div>}

View File

@@ -8,7 +8,8 @@ import { EntryDetailed } from "./EntryDetailed";
import playIcon from 'assets/run.svg';
import pauseIcon from 'assets/pause.svg';
import variables from '../../variables.module.scss';
import { toast } from 'react-toastify';
import { toast,ToastContainer } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
import debounce from 'lodash/debounce';
import { RecoilRoot, RecoilState, useRecoilState, useRecoilValue, useSetRecoilState } from "recoil";
import entriesAtom from "../../recoil/entries";
@@ -21,7 +22,6 @@ import TrafficViewerApi from "./TrafficViewerApi";
import { StatusBar } from "../UI/StatusBar";
import tappingStatusAtom from "../../recoil/tappingStatus/atom";
const useLayoutStyles = makeStyles(() => ({
details: {
flex: "0 0 50%",
@@ -45,26 +45,22 @@ const useLayoutStyles = makeStyles(() => ({
interface TrafficViewerProps {
setAnalyzeStatus?: (status: any) => void;
api?: any
message?: {}
error?: {}
isWebSocketOpen: boolean
trafficViewerApiProp: TrafficViewerApi,
actionButtons?: JSX.Element,
isShowStatusBar?: boolean
isShowStatusBar?: boolean,
webSocketUrl : string
}
const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message, error, isWebSocketOpen, trafficViewerApiProp, actionButtons, isShowStatusBar }) => {
export const TrafficViewer : React.FC<TrafficViewerProps> = ({setAnalyzeStatus, trafficViewerApiProp, actionButtons,isShowStatusBar,webSocketUrl}) => {
const classes = useLayoutStyles();
const [entries, setEntries] = useRecoilState(entriesAtom);
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
const [wsConnection, setWsConnection] = useRecoilState(websocketConnectionAtom);
const query = useRecoilValue(queryAtom);
const [queryToSend, setQueryToSend] = useState("")
const setTrafficViewerApiState = useSetRecoilState(trafficViewerApiAtom as RecoilState<TrafficViewerApi>)
const [tappingStatus, setTappingStatus] = useRecoilState(tappingStatusAtom);
const [noMoreDataTop, setNoMoreDataTop] = useState(false);
const [isSnappedToBottom, setIsSnappedToBottom] = useState(true);
@@ -107,6 +103,7 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
handleQueryChange(query);
}, [query, handleQueryChange]);
const ws = useRef(null);
const listEntry = useRef(null);
const openWebSocket = (query: string, resetEntries: boolean) => {
@@ -117,98 +114,99 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
setLeftOffTop(null);
setNoMoreDataTop(false);
}
setQueryToSend(query)
trafficViewerApiProp.webSocket.open();
}
const onmessage = useCallback((e) => {
if (!e?.data) return;
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
const newEntries = [...entries, entry];
if (newEntries.length === 10001) {
setLeftOffTop(newEntries[0].entry.id);
newEntries.shift();
setNoMoreDataTop(false);
}
setEntries(newEntries);
break;
case "status":
setTappingStatus(message.tappingStatus);
break;
case "analyzeStatus":
setAnalyzeStatus(message.analyzeStatus);
break;
case "outboundLink":
onTLSDetected(message.Data.DstIP);
break;
case "toast":
toast[message.data.type](message.data.text, {
position: "bottom-right",
theme: "colored",
autoClose: message.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
break;
case "queryMetadata":
setQueriedCurrent(queriedCurrent + message.data.current);
setQueriedTotal(message.data.total);
setLeftOffBottom(message.data.leftOff);
setTruncatedTimestamp(message.data.truncatedTimestamp);
if (leftOffTop === null) {
setLeftOffTop(message.data.leftOff - 1);
}
break;
case "startTime":
setStartTime(message.data);
break;
default:
console.error(
`unsupported websocket message type, Got: ${message.messageType}`
);
ws.current = new WebSocket(webSocketUrl);
ws.current.onopen = () => {
setWsConnection(WsConnectionStatus.Connected);
sendQueryWhenWsOpen(query);
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [message]);
useEffect(() => {
onmessage(message)
}, [message, onmessage])
useEffect(() => {
onerror(error)
}, [error])
useEffect(() => {
isWebSocketOpen ? setWsConnection(WsConnectionStatus.Connected) : setWsConnection(WsConnectionStatus.Closed)
trafficViewerApiProp.webSocket.sendQuery(queryToSend)
}, [isWebSocketOpen, queryToSend, setWsConnection])
const onerror = (event) => {
console.error("WebSocket error:", event);
if (query) {
openWebSocket(`(${query}) and leftOff(${leftOffBottom})`, false);
} else {
openWebSocket(`leftOff(${leftOffBottom})`, false);
ws.current.onclose = () => {
setWsConnection(WsConnectionStatus.Closed);
}
ws.current.onerror = (event) => {
console.error("WebSocket error:", event);
if (query) {
openWebSocket(`(${query}) and leftOff(${leftOffBottom})`, false);
} else {
openWebSocket(`leftOff(${leftOffBottom})`, false);
}
}
}
const sendQueryWhenWsOpen = (query) => {
setTimeout(() => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({"query": query, "enableFullEntries": false}));
} else {
sendQueryWhenWsOpen(query);
}
}, 500)
}
if (ws.current) {
ws.current.onmessage = (e) => {
if (!e?.data) return;
const message = JSON.parse(e.data);
switch (message.messageType) {
case "entry":
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
const newEntries = [...entries, entry];
if (newEntries.length === 10001) {
setLeftOffTop(newEntries[0].entry.id);
newEntries.shift();
setNoMoreDataTop(false);
}
setEntries(newEntries);
break;
case "status":
setTappingStatus(message.tappingStatus);
break;
case "analyzeStatus":
setAnalyzeStatus(message.analyzeStatus);
break;
case "outboundLink":
onTLSDetected(message.Data.DstIP);
break;
case "toast":
toast[message.data.type](message.data.text, {
position: "bottom-right",
theme: "colored",
autoClose: message.data.autoClose,
hideProgressBar: false,
closeOnClick: true,
pauseOnHover: true,
draggable: true,
progress: undefined,
});
break;
case "queryMetadata":
setQueriedCurrent(queriedCurrent + message.data.current);
setQueriedTotal(message.data.total);
setLeftOffBottom(message.data.leftOff);
setTruncatedTimestamp(message.data.truncatedTimestamp);
if (leftOffTop === null) {
setLeftOffTop(message.data.leftOff - 1);
}
break;
case "startTime":
setStartTime(message.data);
break;
default:
console.error(
`unsupported websocket message type, Got: ${message.messageType}`
);
}
};
}
useEffect(() => {
setTrafficViewerApiState({...trafficViewerApiProp, webSocket : {close : () => ws.current.close()}});
(async () => {
setTrafficViewerApiState(trafficViewerApiProp)
openWebSocket("leftOff(-1)", true);
try {
try{
const tapStatusResponse = await trafficViewerApiProp.tapStatus();
setTappingStatus(tapStatusResponse);
if (setAnalyzeStatus) {
if(setAnalyzeStatus) {
const analyzeStatusResponse = await trafficViewerApiProp.analyzeStatus();
setAnalyzeStatus(analyzeStatusResponse);
}
@@ -220,8 +218,8 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
}, []);
const toggleConnection = () => {
if (wsConnection === WsConnectionStatus.Closed) {
ws.current.close();
if (wsConnection !== WsConnectionStatus.Connected) {
if (query) {
openWebSocket(`(${query}) and leftOff(-1)`, true);
} else {
@@ -230,10 +228,6 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
scrollableRef.current.jumpToBottom();
setIsSnappedToBottom(true);
}
else if (wsConnection === WsConnectionStatus.Connected) {
trafficViewerApiProp.webSocket.close()
setWsConnection(WsConnectionStatus.Closed);
}
}
const onTLSDetected = (destAddress: string) => {
@@ -270,7 +264,7 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
const onSnapBrokenEvent = () => {
setIsSnappedToBottom(false);
if (wsConnection === WsConnectionStatus.Connected) {
trafficViewerApiProp.webSocket.close()
ws.current.close();
}
}
@@ -330,16 +324,17 @@ const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message
setAddressesWithTLS={setAddressesWithTLS}
userDismissedTLSWarning={userDismissedTLSWarning}
setUserDismissedTLSWarning={setUserDismissedTLSWarning} />
<ToastContainer/>
</div>
);
};
const MemoiedTrafficViewer = React.memo(TrafficViewer)
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, message, isWebSocketOpen, trafficViewerApiProp, actionButtons, isShowStatusBar = true }) => {
const TrafficViewerContainer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus, trafficViewerApiProp, actionButtons, isShowStatusBar = true ,webSocketUrl}) => {
return <RecoilRoot>
<MemoiedTrafficViewer message={message} isWebSocketOpen={isWebSocketOpen} actionButtons={actionButtons} isShowStatusBar={isShowStatusBar}
<MemoiedTrafficViewer actionButtons={actionButtons} isShowStatusBar={isShowStatusBar} webSocketUrl={webSocketUrl}
trafficViewerApiProp={trafficViewerApiProp} setAnalyzeStatus={setAnalyzeStatus} />
</RecoilRoot>
}
export default TrafficViewerContainer
export default TrafficViewerContainer

View File

@@ -6,10 +6,8 @@ type TrafficViewerApi = {
getEntry : (entryId : any, query:string) => any
getRecentTLSLinks : () => any,
webSocket : {
open : () => {},
close : () => {},
sendQuery : (query:string) => {}
close : () => {}
}
}
export default TrafficViewerApi
export default TrafficViewerApi

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

View File

@@ -46,6 +46,7 @@ const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
displayIconOnMouseOver={true}
flipped={false}
iconStyle={{marginTop: "52px", marginRight: "10px", zIndex: 1000}}
tooltipStyle={{marginTop: "-22px", zIndex: 1001}}
>
<span
className={`${styles.base} ${styles.vertical}`}

View File

@@ -11,11 +11,12 @@ interface Props {
iconStyle?: object,
className?: string,
useTooltip?: boolean,
tooltipStyle?: object,
displayIconOnMouseOver?: boolean,
flipped?: boolean,
}
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip= true, displayIconOnMouseOver = false, flipped = false, children}) => {
const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTooltip = true, tooltipStyle = null, displayIconOnMouseOver = false, flipped = false, children}) => {
const [showAddedNotification, setAdded] = useState(false);
const [showTooltip, setShowTooltip] = useState(false);
const [queryState, setQuery] = useRecoilState(queryAtom);
@@ -48,12 +49,12 @@ const Queryable: React.FC<Props> = ({query, style, iconStyle, className, useTool
</CopyToClipboard> : null;
return (
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
<div className={`${QueryableStyle.QueryableContainer} ${QueryableStyle.displayIconOnMouseOver} ${className ? className : ''} ${displayIconOnMouseOver ? QueryableStyle.displayIconOnMouseOver : ''}`}
style={style} onMouseOver={ e => setShowTooltip(true)} onMouseLeave={ e => setShowTooltip(false)}>
{flipped && addButton}
{children}
{!flipped && addButton}
{useTooltip && showTooltip && <span className={QueryableStyle.QueryableTooltip}>{query}</span>}
{useTooltip && showTooltip && <span data-cy={"QueryableTooltip"} className={QueryableStyle.QueryableTooltip} style={tooltipStyle}>{query}</span>}
</div>
);
};

View File

@@ -11,15 +11,14 @@ const pluralize = (noun: string, amount: number) => {
}
export const StatusBar = () => {
const tappingStatus = useRecoilValue(tappingStatusAtom);
const [expandedBar, setExpandedBar] = useState(false);
const {uniqueNamespaces, amountOfPods, amountOfTappedPods, amountOfUntappedPods} = useRecoilValue(tappingStatusDetails);
return <div className={`${style.statusBar} ${(expandedBar ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)}>
return <div className={`${style.statusBar} ${(expandedBar ? `${style.expandedStatusBar}` : "")}`} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)} data-cy="expandedStatusBar">
<div className={style.podsCount}>
{tappingStatus.some(pod => !pod.isTapped) && <img src={warningIcon} alt="warning"/>}
<span className={style.podsCountText}>
<span className={style.podsCountText} data-cy="podsCountText">
{`Tapping ${amountOfUntappedPods > 0 ? amountOfTappedPods + " / " + amountOfPods : amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
</span>
</div>

View File

@@ -18,7 +18,9 @@ const useWS = (wsUrl: string) => {
const onMessage = (e) => { setMessage(e) }
const onError = (e) => setError(e)
const onOpen = () => { setisOpen(true) }
const onClose = () => setisOpen(false)
const onClose = () => {
setisOpen(false)
}
const openSocket = () => {
ws.current = new WebSocket(wsUrl)
@@ -36,13 +38,17 @@ const useWS = (wsUrl: string) => {
ws.current.removeEventListener("close", onClose)
}
const sendQuery = (query: string) => {
if (ws.current && (ws.current.readyState === WebSocketReadyState.OPEN)) {
ws.current.send(JSON.stringify({ "query": query, "enableFullEntries": false }));
}
const sendQueryWhenWsOpen = (query) => {
setTimeout(() => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.send(JSON.stringify({"query": query, "enableFullEntries": false}));
} else {
sendQueryWhenWsOpen(query);
}
}, 500)
}
return { message, error, isOpen, openSocket, closeSocket, sendQuery }
return { message, error, isOpen, openSocket, closeSocket, sendQueryWhenWsOpen }
}
export default useWS
export default useWS

View File

@@ -13,7 +13,7 @@
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10",
"@uiw/react-textarea-code-editor": "^1.4.12",
"@up9/mizu-common": "1.0.125",
"@up9/mizu-common": "^1.0.128",
"axios": "^0.25.0",
"core-js": "^3.20.2",
"craco-babel-loader": "^1.0.3",

View File

@@ -1,11 +1,11 @@
import React, { useEffect } from "react";
import React, {useEffect} from "react";
import { Button } from "@material-ui/core";
import Api, {getWebsocketUrl} from "../../../helpers/api";
import Api,{getWebsocketUrl} from "../../../helpers/api";
import debounce from 'lodash/debounce';
import {useSetRecoilState, useRecoilState} from "recoil";
import {useCommonStyles} from "../../../helpers/commonStyle"
import serviceMapModalOpenAtom from "../../../recoil/serviceMapModalOpen";
import TrafficViewer ,{useWS,DEFAULT_QUERY} from "@up9/mizu-common"
import TrafficViewer from "@up9/mizu-common"
import "@up9/mizu-common/dist/index.css"
import oasModalOpenAtom from "../../../recoil/oasModalOpen/atom";
import serviceMap from "../../assets/serviceMap.svg";
@@ -22,11 +22,10 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
const setServiceMapModalOpen = useSetRecoilState(serviceMapModalOpenAtom);
const [openOasModal, setOpenOasModal] = useRecoilState(oasModalOpenAtom);
const {message,error,isOpen, openSocket, closeSocket, sendQuery} = useWS(getWebsocketUrl())
const trafficViewerApi = {...api, webSocket:{open : openSocket, close: closeSocket, sendQuery: sendQuery}}
const trafficViewerApi = {...api}
const handleOpenOasModal = () => {
closeSocket()
//closeSocket() -- Todo: Add Close webSocket
setOpenOasModal(true);
}
@@ -56,18 +55,16 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
</Button>}
</div>
sendQuery(DEFAULT_QUERY);
useEffect(() => {
return () => {
closeSocket()
//closeSocket()
}
},[])
return (
<>
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} message={message} error={error} isWebSocketOpen={isOpen}
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={getWebsocketUrl()}
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!openOasModal}/>
</>
);
};
};

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB