Compare commits

..

6 Commits

Author SHA1 Message Date
lirazyehezkel
45611c4c13 TRA-4477 FE holds limit of 10000 entries (#987)
* FE holds limit of 10000 entries

* let to const
2022-04-11 15:04:42 +03:00
RoyUP9
bb425fa6e2 Fixed service map unresolved bug (#986) 2022-04-11 14:37:53 +03:00
lirazyehezkel
4bc83ebcb5 Fix WS error when switching from settings to traffic viewer (#985) 2022-04-11 14:21:31 +03:00
M. Mert Yıldıran
bbb44dae79 Fix the unit tests of protocol extensions (#982) 2022-04-09 06:56:09 -07:00
M. Mert Yıldıran
72a1aba3e5 TRA-4410 Display namespace field in the UI (#974) 2022-04-08 21:16:25 +03:00
RoyUP9
d8fb8ff710 Fix for OAS reset not working (#978) 2022-04-07 18:14:03 +03:00
17 changed files with 92 additions and 47 deletions

View File

@@ -15,6 +15,7 @@ require (
github.com/go-playground/validator/v10 v10.10.0
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
github.com/jinzhu/copier v0.3.5
github.com/nav-inc/datetime v0.1.3
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v1.0.0

View File

@@ -428,6 +428,8 @@ github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=

View File

@@ -199,7 +199,7 @@ func runInHarReaderMode() {
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.Start()
oasGenerator.Start(nil)
}
if config.Config.ServiceMap {
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
@@ -371,7 +371,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
func initializeDependencies() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance(nil) })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })

View File

@@ -58,12 +58,12 @@ func getRecorderAndContext() (*httptest.ResponseRecorder, *gin.Context) {
receiveBuffer: bytes.NewBufferString("\n"),
}
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} {
return oas.GetDefaultOasGeneratorInstance(dummyConn)
return oas.GetDefaultOasGeneratorInstance()
})
recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder)
oas.GetDefaultOasGeneratorInstance(dummyConn).Start()
oas.GetDefaultOasGeneratorInstance(dummyConn).GetServiceSpecs().Store("some", oas.NewGen("some"))
oas.GetDefaultOasGeneratorInstance().Start(dummyConn)
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
return recorder, c
}

View File

@@ -19,7 +19,7 @@ var (
)
type OasGenerator interface {
Start()
Start(conn *basenine.Connection)
Stop()
IsStarted() bool
GetServiceSpecs() *sync.Map
@@ -35,23 +35,41 @@ type defaultOasGenerator struct {
entriesQuery string
}
func GetDefaultOasGeneratorInstance(conn *basenine.Connection) *defaultOasGenerator {
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
syncOnce.Do(func() {
instance = NewDefaultOasGenerator(conn)
instance = NewDefaultOasGenerator()
logger.Log.Debug("OAS Generator Initialized")
})
return instance
}
func (g *defaultOasGenerator) Start() {
func (g *defaultOasGenerator) Start(conn *basenine.Connection) {
if g.started {
return
}
if g.dbConn == nil {
if conn == nil {
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err)
return
}
conn = newConn
}
g.dbConn = conn
}
ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel
g.ctx = ctx
g.serviceSpecs = &sync.Map{}
g.started = true
go g.runGenerator()
}
@@ -59,8 +77,15 @@ func (g *defaultOasGenerator) Stop() {
if !g.started {
return
}
if g.dbConn != nil {
g.dbConn.Close()
g.dbConn = nil
}
g.cancel()
g.reset()
g.started = false
}
@@ -69,7 +94,7 @@ func (g *defaultOasGenerator) IsStarted() bool {
}
func (g *defaultOasGenerator) runGenerator() {
// Make []byte channels to recieve the data and the meta
// Make []byte channels to receive the data and the meta
dataChan := make(chan []byte)
metaChan := make(chan []byte)
@@ -80,6 +105,8 @@ func (g *defaultOasGenerator) runGenerator() {
select {
case <-g.ctx.Done():
logger.Log.Infof("OAS Generator was canceled")
close(dataChan)
close(metaChan)
return
case metaBytes, ok := <-metaChan:
@@ -181,21 +208,12 @@ func (g *defaultOasGenerator) SetEntriesQuery(query string) bool {
return changed
}
func NewDefaultOasGenerator(conn *basenine.Connection) *defaultOasGenerator {
if conn == nil {
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
}
conn = newConn
}
func NewDefaultOasGenerator() *defaultOasGenerator {
return &defaultOasGenerator{
started: false,
ctx: nil,
cancel: nil,
serviceSpecs: nil,
dbConn: conn,
dbConn: nil,
}
}

View File

@@ -3,14 +3,11 @@ package oas
import (
"encoding/json"
"github.com/up9inc/mizu/agent/pkg/har"
"sync"
"testing"
)
func TestOASGen(t *testing.T) {
gen := new(defaultOasGenerator)
gen.dbConn = GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
gen.serviceSpecs = &sync.Map{}
e := new(har.Entry)
err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e)
@@ -22,7 +19,9 @@ func TestOASGen(t *testing.T) {
Destination: "some",
Entry: *e,
}
gen.Start()
dummyConn := GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
gen.Start(dummyConn)
gen.handleHARWithSource(ews)
g, ok := gen.serviceSpecs.Load("some")
if !ok {

View File

@@ -61,8 +61,7 @@ func TestEntries(t *testing.T) {
t.FailNow()
}
dummyConn := GetFakeDBConn("\n")
gen := NewDefaultOasGenerator(dummyConn)
gen := NewDefaultOasGenerator()
gen.serviceSpecs = new(sync.Map)
loadStartingOAS("test_artifacts/catalogue.json", "catalogue", gen.serviceSpecs)
loadStartingOAS("test_artifacts/trcc.json", "trcc-api-service", gen.serviceSpecs)
@@ -136,8 +135,7 @@ func TestEntries(t *testing.T) {
}
func TestFileSingle(t *testing.T) {
dummyConn := GetFakeDBConn("\n")
gen := NewDefaultOasGenerator(dummyConn)
gen := NewDefaultOasGenerator()
gen.serviceSpecs = new(sync.Map)
// loadStartingOAS()
file := "test_artifacts/params.har"
@@ -227,8 +225,7 @@ func loadStartingOAS(file string, label string, specs *sync.Map) {
}
func TestEntriesNegative(t *testing.T) {
dummyConn := GetFakeDBConn("\n")
gen := NewDefaultOasGenerator(dummyConn)
gen := NewDefaultOasGenerator()
gen.serviceSpecs = new(sync.Map)
files := []string{"invalid"}
_, err := feedEntries(files, false, gen)
@@ -239,8 +236,7 @@ func TestEntriesNegative(t *testing.T) {
}
func TestEntriesPositive(t *testing.T) {
dummyConn := GetFakeDBConn("\n")
gen := NewDefaultOasGenerator(dummyConn)
gen := NewDefaultOasGenerator()
gen.serviceSpecs = new(sync.Map)
files := []string{"test_artifacts/params.har"}
_, err := feedEntries(files, false, gen)

View File

@@ -1,6 +1,7 @@
package servicemap
import (
"github.com/jinzhu/copier"
"sync"
"github.com/up9inc/mizu/shared/logger"
@@ -183,8 +184,12 @@ func (s *defaultServiceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tap
if len(src.Name) == 0 {
srcEntry = &entryData{
key: key(src.IP),
entry: src,
entry: &tapApi.TCP{},
}
if err := copier.Copy(srcEntry.entry, src); err != nil {
logger.Log.Errorf("Error while copying src entry into src entry data")
}
srcEntry.entry.Name = UnresolvedNodeName
} else {
srcEntry = &entryData{
@@ -196,8 +201,12 @@ func (s *defaultServiceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tap
if len(dst.Name) == 0 {
dstEntry = &entryData{
key: key(dst.IP),
entry: dst,
entry: &tapApi.TCP{},
}
if err := copier.Copy(dstEntry.entry, dst); err != nil {
logger.Log.Errorf("Error while copying dst entry into dst entry data")
}
dstEntry.entry.Name = UnresolvedNodeName
} else {
dstEntry = &entryData{

View File

@@ -161,7 +161,7 @@ type Entry struct {
Capture Capture `json:"capture"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Namespace string `json:"namespace"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect6/amqp/\* expect

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect6/http/\* expect

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect6/kafka/\* expect

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect5/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect6/redis/\* expect

View File

@@ -55,7 +55,7 @@ export const EntriesList: React.FC<EntriesListProps> = ({
const [startTime, setStartTime] = useState(0);
const [truncatedTimestamp, setTruncatedTimestamp] = useState(0);
const leftOffBottom = entries.length > 0 ? entries[entries.length - 1].id : -1;
const leftOffBottom = entries.length > 0 ? entries[entries.length - 1].id + 1 : -1;
useEffect(() => {
const list = document.getElementById('list').firstElementChild;
@@ -98,6 +98,9 @@ export const EntriesList: React.FC<EntriesListProps> = ({
setIsLoadingTop(false);
const newEntries = [...data.data.reverse(), ...entries];
if(newEntries.length > 10000) {
newEntries.splice(10000, newEntries.length - 10000)
}
setEntries(newEntries);
setQueriedTotal(data.meta.total);
@@ -126,9 +129,9 @@ export const EntriesList: React.FC<EntriesListProps> = ({
const entry = message.data;
if (!focusedEntryId) setFocusedEntryId(entry.id.toString());
const newEntries = [...entries, entry];
if (newEntries.length === 10001) {
if (newEntries.length > 10000) {
setLeftOffTop(newEntries[0].id);
newEntries.shift();
newEntries.splice(0, newEntries.length - 10000)
setNoMoreDataTop(false);
}
setEntries(newEntries);

View File

@@ -89,12 +89,13 @@ const EntryTitle: React.FC<any> = ({ protocol, data, elapsedTime }) => {
</div>;
};
const EntrySummary: React.FC<any> = ({ entry }) => {
const EntrySummary: React.FC<any> = ({ entry, namespace }) => {
return <EntryItem
key={`entry-${entry.id}`}
entry={entry}
style={{}}
headingMode={true}
namespace={namespace}
/>;
};
@@ -140,7 +141,7 @@ export const EntryDetailed = () => {
data={entryData.data}
elapsedTime={entryData.data.elapsedTime}
/>}
{!isLoading && entryData && <EntrySummary entry={entryData.base} />}
{!isLoading && entryData && <EntrySummary entry={entryData.base} namespace={entryData.data.namespace} />}
<React.Fragment>
{!isLoading && entryData && <EntryViewer
representation={entryData.representation}

View File

@@ -52,6 +52,7 @@ interface EntryProps {
entry: Entry;
style: object;
headingMode: boolean;
namespace?: string;
}
enum CaptureTypes {
@@ -62,7 +63,7 @@ enum CaptureTypes {
Ebpf = "ebpf",
}
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) => {
export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode, namespace}) => {
const [focusedEntryId, setFocusedEntryId] = useRecoilState(focusedEntryIdAtom);
const [queryState, setQuery] = useRecoilState(queryAtom);
@@ -224,6 +225,19 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
: ""
}
<div className={styles.separatorRight}>
{headingMode ? <Queryable
query={`namespace == "${namespace}"`}
displayIconOnMouseOver={true}
flipped={true}
iconStyle={{marginRight: "16px"}}
>
<span
className={`${styles.tcpInfo} ${styles.ip}`}
title="Namespace"
>
{namespace}
</span>
</Queryable> : null}
<Queryable
query={`src.ip == "${entry.src.ip}"`}
displayIconOnMouseOver={true}

View File

@@ -201,7 +201,9 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({
useEffect(() => {
return () => {
ws.current.close();
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.close();
}
};
}, []);