Compare commits

...

11 Commits

Author SHA1 Message Date
M. Mert Yıldıran
d844d6eb04 Upgrade to Basenine v0.6.3, add xml and time helpers, make xml and json helpers available in redact helper (#891)
* Upgrade to Basenine `v0.6.2`, add `xml` helper, make `xml` and `json` helpers available in `redact` helper

* URL encode the query

* Upgrade to Basenine `v0.6.3`
2022-03-15 12:21:25 +03:00
David Levanon
6979441422 tls missing addresses (#825)
* stream seen file descriptors from ebpf

* re-generate bpf object files

* fixing pr comments
2022-03-14 15:40:27 +02:00
David Levanon
9ec8347c6c set bpf filter for pcap (#865)
* set bpf filter for pcap

* implement pod updating mechanism

* Update tap/source/netns_packet_source.go

* Update tap/source/netns_packet_source.go

* minor pr fixes

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-03-14 15:35:49 +02:00
Igor Gov
617fb89ca5 Build custom branch Github action (#890)
* Build custom branch github action #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* #build_and_publish_custom_image

* .
2022-03-14 13:15:28 +02:00
Igor Gov
1cbd9cb199 Adding dev latest tag for each pre-release docker (#888) 2022-03-13 09:47:49 +02:00
Igor Gov
23c1b66855 Adding dev latest tag for each pre-release docker (#885) 2022-03-10 17:26:56 +02:00
RoyUP9
f5fa9ff270 Added mizu install template (#884) 2022-03-09 17:52:55 +02:00
David Levanon
4159938cea add minikube over virtualbox cgroup format (#882) 2022-03-09 11:19:11 +02:00
David Levanon
5614e153f3 compile ebpf objects with llvm version 11 (#880) 2022-03-06 17:45:13 +02:00
Igor Gov
5e90d67b0e Run PR validation check only when needed & use docker cache during build (#876)
* Improve PR validation checks
2022-03-06 15:03:43 +02:00
M. Mert Yıldıran
dd430c31d5 Always derive the summary and method fields from the entry in the database on read (#877)
* Always derive the summary and method fields from the entry in the database on read

* Update the expected JSONs in the protocol unit tests

* Add test cases for `Summarize` method

* Remove unused `GetEntry` method, `DataUnmarshaler` struct and `UnmarshalData` method

* Temporarily enable the acceptance tests

* Temporarily disable Slack notification on failure

* Update the Cypress tests

* Fix an issue in Redis

* Fix a typo and the Cypress tests

* Revert "Temporarily disable Slack notification on failure"

This reverts commit cad1901ea4.

* Revert "Temporarily enable the acceptance tests"

This reverts commit bad7706c9b.
2022-03-06 15:41:36 +03:00
63 changed files with 1319 additions and 565 deletions

View File

@@ -0,0 +1,44 @@
name: Build Custom Branch
on: push
concurrency:
group: custom-branch-build-${{ github.ref }}
cancel-in-progress: true
jobs:
build:
name: Push custom branch image to GCR
runs-on: ubuntu-latest
if: ${{ contains(github.event.head_commit.message, '#build_and_publish_custom_image') }}
steps:
- name: Check out the repo
uses: actions/checkout@v2
- id: 'auth'
uses: 'google-github-actions/auth@v0'
with:
credentials_json: '${{ secrets.GCR_JSON_KEY }}'
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
- name: Get base image name
shell: bash
run: echo "##[set-output name=image;]$(echo gcr.io/up9-docker-hub/mizu/${GITHUB_REF#refs/heads/})"
id: base_image_step
- name: Login to GCR
uses: docker/login-action@v1
with:
registry: gcr.io
username: _json_key
password: ${{ secrets.GCR_JSON_KEY }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
push: true
tags: ${{ steps.base_image_step.outputs.image }}:latest

View File

@@ -15,15 +15,23 @@ jobs:
name: CLI executable build
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 2
- name: Check modified files
id: modified_files
run: devops/check_modified_files.sh cli/
- name: Set up Go 1.17
if: steps.modified_files.outputs.matched == 'true'
uses: actions/setup-go@v2
with:
go-version: '1.17'
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Build CLI
if: steps.modified_files.outputs.matched == 'true'
run: make cli
build-agent:
@@ -32,6 +40,23 @@ jobs:
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 2
- name: Check modified files
id: modified_files
run: devops/check_modified_files.sh agent/ shared/ tap/ ui/ Dockerfile
- name: Build Agent
run: make agent-docker
- name: Set up Docker Buildx
if: steps.modified_files.outputs.matched == 'true'
uses: docker/setup-buildx-action@v1
- name: Build
uses: docker/build-push-action@v2
if: steps.modified_files.outputs.matched == 'true'
with:
context: .
push: false
tags: up9inc/mizu:devlatest
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@@ -59,6 +59,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
flavor: |
latest=auto
prefix=
@@ -145,6 +146,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
flavor: |
latest=auto
prefix=
@@ -208,7 +210,7 @@ jobs:
tags: |
type=raw,${{ steps.versioning.outputs.version }}
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
- name: Login to Docker Hub
uses: docker/login-action@v1
with:

View File

@@ -19,14 +19,16 @@ jobs:
name: Unit Tests
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 2
- name: Set up Go 1.17
uses: actions/setup-go@v2
with:
go-version: '^1.17'
- name: Check out code into the Go module directory
uses: actions/checkout@v2
- name: Install libpcap
shell: bash
run: |
@@ -40,16 +42,31 @@ jobs:
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
- name: Check CLI modified files
id: cli_modified_files
run: devops/check_modified_files.sh cli/
- name: CLI Test
if: github.event_name == 'push' || steps.cli_modified_files.outputs.matched == 'true'
run: make test-cli
- name: Check Agent modified files
id: agent_modified_files
run: devops/check_modified_files.sh agent/
- name: Agent Test
if: github.event_name == 'push' || steps.agent_modified_files.outputs.matched == 'true'
run: make test-agent
- name: Shared Test
run: make test-shared
- name: Check extensions modified files
id: ext_modified_files
run: devops/check_modified_files.sh tap/extensions/
- name: Extensions Test
if: github.event_name == 'push' || steps.ext_modified_files.outputs.matched == 'true'
run: make test-extensions
- name: Upload coverage to Codecov

View File

@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
RUN chmod +x ./basenine_linux_${GOARCH}
RUN mv ./basenine_linux_${GOARCH} ./basenine

View File

@@ -65,14 +65,14 @@ export function checkThatAllEntriesShown() {
}
export function checkFilterByMethod(funcDict) {
const {protocol, method, summary, hugeMizu} = funcDict;
const summaryDict = getSummeryDict(summary);
const methodDict = getMethodDict(method);
const {protocol, method, methodQuery, summary, summaryQuery} = funcDict;
const summaryDict = getSummaryDict(summary, summaryQuery);
const methodDict = getMethodDict(method, methodQuery);
const protocolDict = getProtocolDict(protocol.name, protocol.text);
it(`Testing the method: ${method}`, function () {
// applying filter
cy.get('.w-tc-editor-text').clear().type(`method == "${method}"`);
cy.get('.w-tc-editor-text').clear().type(methodQuery);
cy.get('[type="submit"]').click();
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
@@ -121,7 +121,7 @@ function resizeIfNeeded(entriesLen) {
function deepCheck(generalDict, protocolDict, methodDict, entry) {
const entryNum = getEntryNumById(entry.id);
const {summary, value} = generalDict;
const summaryDict = getSummeryDict(summary);
const summaryDict = getSummaryDict(summary);
leftOnHoverCheck(entryNum, methodDict.pathLeft, methodDict.expectedOnHover);
leftOnHoverCheck(entryNum, protocolDict.pathLeft, protocolDict.expectedOnHover);
@@ -149,13 +149,13 @@ function deepCheck(generalDict, protocolDict, methodDict, entry) {
}
}
function getSummeryDict(summary) {
if (summary) {
function getSummaryDict(value, query) {
if (value) {
return {
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
expectedText: summary,
expectedOnHover: `summary == "${summary}"`
expectedText: value,
expectedOnHover: query
};
}
else {
@@ -163,12 +163,12 @@ function getSummeryDict(summary) {
}
}
function getMethodDict(method) {
function getMethodDict(value, query) {
return {
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
expectedText: method,
expectedOnHover: `method == "${method}"`
expectedText: value,
expectedOnHover: query
};
}

View File

@@ -9,41 +9,53 @@ const rabbitProtocolDetails = {name: 'AMQP', text: 'Advanced Message Queuing Pro
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'exchange declare',
methodQuery: 'request.method == "exchange declare"',
summary: 'exchange',
summaryQuery: 'request.exchange == "exchange"',
value: null
});
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'queue declare',
methodQuery: 'request.method == "queue declare"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
value: null
});
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'queue bind',
methodQuery: 'request.method == "queue bind"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
value: null
});
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'basic publish',
methodQuery: 'request.method == "basic publish"',
summary: 'exchange',
summaryQuery: 'request.exchange == "exchange"',
value: {tab: valueTabs.request, regex: /^message$/mg}
});
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'basic consume',
methodQuery: 'request.method == "basic consume"',
summary: 'queue',
summaryQuery: 'request.queue == "queue"',
value: null
});
checkFilterByMethod({
protocol: rabbitProtocolDetails,
method: 'basic deliver',
methodQuery: 'request.method == "basic deliver"',
summary: 'exchange',
summaryQuery: 'request.queue == "exchange"',
value: {tab: valueTabs.request, regex: /^message$/mg}
});

View File

@@ -9,34 +9,44 @@ const redisProtocolDetails = {name: 'redis', text: 'Redis Serialization Protocol
checkFilterByMethod({
protocol: redisProtocolDetails,
method: 'PING',
methodQuery: 'request.command == "PING"',
summary: null,
summaryQuery: '',
value: null
})
checkFilterByMethod({
protocol: redisProtocolDetails,
method: 'SET',
methodQuery: 'request.command == "SET"',
summary: 'key',
summaryQuery: 'request.key == "key"',
value: {tab: valueTabs.request, regex: /^\[value, keepttl]$/mg}
})
checkFilterByMethod({
protocol: redisProtocolDetails,
method: 'EXISTS',
methodQuery: 'request.command == "EXISTS"',
summary: 'key',
summaryQuery: 'request.key == "key"',
value: {tab: valueTabs.response, regex: /^1$/mg}
})
checkFilterByMethod({
protocol: redisProtocolDetails,
method: 'GET',
methodQuery: 'request.command == "GET"',
summary: 'key',
summaryQuery: 'request.key == "key"',
value: {tab: valueTabs.response, regex: /^value$/mg}
})
checkFilterByMethod({
protocol: redisProtocolDetails,
method: 'DEL',
methodQuery: 'request.command == "DEL"',
summary: 'key',
summaryQuery: 'request.key == "key"',
value: {tab: valueTabs.response, regex: /^1$|^0$/mg}
})

View File

@@ -113,7 +113,7 @@ if (Cypress.env('shouldCheckSrcAndDest')) {
}
checkFilter({
name: 'method == "GET"',
name: 'request.method == "GET"',
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
leftSideExpectedText: 'GET',
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
@@ -122,7 +122,7 @@ checkFilter({
});
checkFilter({
name: 'summary == "/get"',
name: 'request.path == "/get"',
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
leftSideExpectedText: '/get',
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
@@ -139,7 +139,7 @@ checkFilter({
applyByEnter: false
});
checkFilterNoResults('method == "POST"');
checkFilterNoResults('request.method == "POST"');
function checkFilterNoResults(filterName) {
it(`checking the filter: ${filterName}. Expecting no results`, function () {

View File

@@ -22,7 +22,7 @@ require (
github.com/ory/kratos-client-go v0.8.2-alpha.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.7.0
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -853,14 +853,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1 h1:0XN8s3HtwUBr9hbWRAFulFMsu1f2cabfJbwpz/sOoLA=
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e h1:nv/A/AeF8PcU91aHAj6o2cU8fl/46v0ZLj7wgIKjv+o=
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500 h1:T1QHxt65NMete/GobVSvcHnwZAQibvahhrMTCgtnSS4=
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0 h1:mSqZuJJV4UZyaAoC8x7/AO7DLidlXepFyU18Vm3rFiA=
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0=
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -17,6 +17,12 @@ import (
tapApi "github.com/up9inc/mizu/tap/api"
)
var extensionsMap map[string]*tapApi.Extension // global
func InitExtensionsMap(ref map[string]*tapApi.Extension) {
extensionsMap = ref
}
type EventHandlers interface {
WebSocketConnect(socketId int, isTapper bool)
WebSocketDisconnect(socketId int, isTapper bool)
@@ -165,7 +171,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
if params.EnableFullEntries {
message, _ = models.CreateFullEntryWebSocketMessage(entry)
} else {
base := tapApi.Summarize(entry)
extension := extensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
message, _ = models.CreateBaseEntryWebSocketMessage(base)
}

View File

@@ -60,6 +60,7 @@ func LoadExtensions() {
})
controllers.InitExtensionsMap(ExtensionsMap)
api.InitExtensionsMap(ExtensionsMap)
}
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {

View File

@@ -77,7 +77,8 @@ func GetEntries(c *gin.Context) {
return // exit
}
base := tapApi.Summarize(entry)
extension := extensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
dataSlice = append(dataSlice, base)
}
@@ -123,6 +124,7 @@ func GetEntry(c *gin.Context) {
}
extension := extensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
var rules []map[string]interface{}
@@ -142,6 +144,7 @@ func GetEntry(c *gin.Context) {
Representation: string(representation),
BodySize: bodySize,
Data: entry,
Base: base,
Rules: rules,
IsRulesEnabled: isRulesEnabled,
})

View File

@@ -80,11 +80,7 @@ type httpEntry struct {
CreatedAt time.Time `json:"createdAt"`
Request map[string]interface{} `json:"request"`
Response map[string]interface{} `json:"response"`
Summary string `json:"summary"`
Method string `json:"method"`
Status int `json:"status"`
ElapsedTime int64 `json:"elapsedTime"`
Path string `json:"path"`
}
func (client *client) PushEntry(entry *api.Entry) {
@@ -103,11 +99,7 @@ func (client *client) PushEntry(entry *api.Entry) {
CreatedAt: entry.StartTime,
Request: entry.Request,
Response: entry.Response,
Summary: entry.Summary,
Method: entry.Method,
Status: entry.Status,
ElapsedTime: entry.ElapsedTime,
Path: entry.Path,
}
entryJson, err := json.Marshal(entryToPush)

View File

@@ -12,10 +12,6 @@ import (
"github.com/up9inc/mizu/tap"
)
func GetEntry(r *tapApi.Entry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
type TapConfig struct {
TappedNamespaces map[string]bool `json:"tappedNamespaces"`
}

View File

@@ -4,11 +4,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/up9inc/mizu/shared/kubernetes"
@@ -59,7 +58,7 @@ func (provider *Provider) TestConnection() error {
func (provider *Provider) isReachable() (bool, error) {
echoUrl := fmt.Sprintf("%s/echo", provider.url)
if _, err := provider.get(echoUrl); err != nil {
if _, err := utils.Get(echoUrl, provider.client); err != nil {
return false, err
} else {
return true, nil
@@ -72,7 +71,7 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
return fmt.Errorf("failed Marshal the tapper status %w", err)
} else {
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else {
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
@@ -89,7 +88,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
if jsonValue, err := json.Marshal(podInfos); err != nil {
return fmt.Errorf("failed Marshal the tapped pods %w", err)
} else {
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
} else {
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
@@ -101,7 +100,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := provider.get(generalStatsUrl)
response, requestErr := utils.Get(generalStatsUrl, provider.client)
if requestErr != nil {
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
}
@@ -126,7 +125,7 @@ func (provider *Provider) GetVersion() (string, error) {
Method: http.MethodGet,
URL: versionUrl,
}
statusResp, err := provider.do(req)
statusResp, err := utils.Do(req, provider.client)
if err != nil {
return "", err
}
@@ -139,40 +138,3 @@ func (provider *Provider) GetVersion() (string, error) {
return versionResponse.Ver, nil
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) get(url string) (*http.Response, error) {
return provider.checkError(provider.client.Get(url))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
return provider.checkError(provider.client.Post(url, contentType, body))
}
// When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
return provider.checkError(provider.client.Do(req))
}
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if (errInOperation != nil) {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

42
cli/bucket/provider.go Normal file
View File

@@ -0,0 +1,42 @@
package bucket
import (
"fmt"
"github.com/up9inc/mizu/cli/utils"
"io/ioutil"
"net/http"
"time"
)
type Provider struct {
url string
client *http.Client
}
const DefaultTimeout = 2 * time.Second
func NewProvider(url string, timeout time.Duration) *Provider {
return &Provider{
url: url,
client: &http.Client{
Timeout: timeout,
},
}
}
func (provider *Provider) GetInstallTemplate(templateName string) (string, error) {
url := fmt.Sprintf("%s/%v", provider.url, templateName)
response, err := utils.Get(url, provider.client)
if err != nil {
return "", err
}
defer response.Body.Close()
installTemplate, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(installTemplate), nil
}

View File

@@ -36,8 +36,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
provider := apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := provider.TestConnection(); err != nil {
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
if err := httpServer.Shutdown(ctx); err != nil {
logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
@@ -51,8 +51,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
return
}
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := apiProvider.TestConnection(); err != nil {
provider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
if err := provider.TestConnection(); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
return

View File

@@ -3,7 +3,6 @@ package cmd
import (
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var installCmd = &cobra.Command{
@@ -11,14 +10,7 @@ var installCmd = &cobra.Command{
Short: "Installs mizu components",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("install", nil)
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace\n")
runMizuInstall()
return nil
},
}

19
cli/cmd/installRunner.go Normal file
View File

@@ -0,0 +1,19 @@
package cmd
import (
"fmt"
"github.com/up9inc/mizu/cli/bucket"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/shared/logger"
)
func runMizuInstall() {
bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
if err != nil {
logger.Log.Errorf("Failed getting install template, err: %v", err)
return
}
fmt.Print(installTemplate)
}

View File

@@ -23,6 +23,7 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Check configStructs.CheckConfig `yaml:"check"`
Install configStructs.InstallConfig `yaml:"install"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -0,0 +1,6 @@
package configStructs
type InstallConfig struct {
TemplateUrl string `yaml:"template-url" default:"https://storage.googleapis.com/static.up9.io/mizu/helm-template"`
TemplateName string `yaml:"template-name" default:"helm-template.yaml"`
}

View File

@@ -11,7 +11,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
@@ -35,6 +35,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alecthomas/participle/v2 v2.0.0-alpha7 // indirect
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect

View File

@@ -120,6 +120,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/mxj/v2 v2.5.5 h1:oT81vUeEiQQ/DcHbzSytRngP6Ky9O+L+0Bw0zSJag9E=
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@@ -598,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0 h1:9PQamOq285DyVsRlS4KB/x2+xkr5QlpiT9Y/BPutS4A=
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:R9bG4y/iq89jNC0xZ25uKDqenyKFTR3X9acGDOkKWSE=
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE=
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=

47
cli/utils/httpUtils.go Normal file
View File

@@ -0,0 +1,47 @@
package utils
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
)
// Get - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Get(url string, client *http.Client) (*http.Response, error) {
return checkError(client.Get(url))
}
// Post - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Post(url, contentType string, body io.Reader, client *http.Client) (*http.Response, error) {
return checkError(client.Post(url, contentType, body))
}
// Do - When err is nil, resp always contains a non-nil resp.Body.
// Caller should close resp.Body when done reading from it.
func Do(req *http.Request, client *http.Client) (*http.Response, error) {
return checkError(client.Do(req))
}
func checkError(response *http.Response, errInOperation error) (*http.Response, error) {
if errInOperation != nil {
return response, errInOperation
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
} else if response.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(response.Body)
response.Body.Close()
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
if err != nil {
return response, err
}
errorMsg := strings.ReplaceAll(string(body), "\n", ";")
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
}
return response, nil
}

45
devops/check_modified_files.sh Executable file
View File

@@ -0,0 +1,45 @@
#!/bin/bash
paths_arr=( "$@" )
printf "\n========== List modified files ==========\n"
echo "$(git diff --name-only HEAD^ HEAD)"
printf "\n========== List paths to match and check existence ==========\n"
for path in ${paths_arr[*]}
do
if [ -f "$path" ] || [ -d "$path" ]; then
echo "$path - found"
else
echo "$path - does not found - exiting with failure"
exit 1
fi
done
printf "\n========== Check paths of modified files ==========\n"
git diff --name-only HEAD^ HEAD > files.txt
matched=false
while IFS= read -r file
do
for path in ${paths_arr[*]}
do
if [[ $file == $path* ]]; then
echo "$file - match path: $path"
matched=true
break
fi
done
if [[ $matched == true ]]; then
break
else
echo "$file - does not match any given path"
fi
done < files.txt
printf "\n========== Result ==========\n"
if [[ $matched = true ]]; then
echo "match found"
echo "::set-output name=matched::true"
else
echo "no match found"
echo "::set-output name=matched::false"
fi

View File

@@ -325,15 +325,22 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
tapperSyncer.config.MizuApiFilteringOptions,
tapperSyncer.config.LogLevel,
tapperSyncer.config.ServiceMesh,
tapperSyncer.config.Tls,
); err != nil {
tapperSyncer.config.Tls); err != nil {
return err
}
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
} else {
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
if err := tapperSyncer.kubernetesProvider.ResetMizuTapperDaemonSet(
tapperSyncer.context,
tapperSyncer.config.MizuResourcesNamespace,
TapperDaemonSetName,
tapperSyncer.config.AgentImage,
TapperPodName); err != nil {
return err
}
logger.Log.Debugf("Successfully reset tapper daemon set")
}
return nil

View File

@@ -449,9 +449,9 @@ func (provider *Provider) CanI(ctx context.Context, namespace string, resource s
Spec: auth.SelfSubjectAccessReviewSpec{
ResourceAttributes: &auth.ResourceAttributes{
Namespace: namespace,
Resource: resource,
Verb: verb,
Group: group,
Resource: resource,
Verb: verb,
Group: group,
},
},
}
@@ -995,6 +995,55 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) ResetMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error {
agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage)
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("mizu-non-existing-label")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpExists)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
podSpec := applyconfcore.PodSpec()
podSpec.WithContainers(agentContainer)
podSpec.WithAffinity(affinity)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": tapperPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
applyOptions := metav1.ApplyOptions{
Force: true,
FieldManager: fieldManagerName,
}
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
}).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
return err
}
func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) {
var pods []core.Pod
for _, namespace := range namespaces {
@@ -1038,7 +1087,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
return matchingPods, nil
}
func(provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
func (provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(namespaces).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", labelName)})
if err != nil {
return nil, err

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sort"
@@ -18,6 +19,9 @@ import (
const mizuTestEnvVar = "MIZU_TEST"
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
var UnknownPort uint16 = 0
type Protocol struct {
Name string `json:"name"`
LongName string `json:"longName"`
@@ -100,6 +104,7 @@ type Dissector interface {
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Summarize(entry *Entry) *BaseEntry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
@@ -135,12 +140,7 @@ type Entry struct {
StartTime time.Time `json:"startTime"`
Request map[string]interface{} `json:"request"`
Response map[string]interface{} `json:"response"`
Summary string `json:"summary"`
Method string `json:"method"`
Status int `json:"status"`
ElapsedTime int64 `json:"elapsedTime"`
Path string `json:"path"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
Rules ApplicableRules `json:"rules,omitempty"`
ContractStatus ContractStatus `json:"contractStatus,omitempty"`
ContractRequestReason string `json:"contractRequestReason,omitempty"`
@@ -154,6 +154,7 @@ type EntryWrapper struct {
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data *Entry `json:"data"`
Base *BaseEntry `json:"base"`
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
IsRulesEnabled bool `json:"isRulesEnabled"`
}
@@ -161,11 +162,12 @@ type EntryWrapper struct {
type BaseEntry struct {
Id uint `json:"id"`
Protocol Protocol `json:"proto,omitempty"`
Url string `json:"url,omitempty"`
Path string `json:"path,omitempty"`
Summary string `json:"summary,omitempty"`
StatusCode int `json:"status"`
SummaryQuery string `json:"summaryQuery,omitempty"`
Status int `json:"status"`
StatusQuery string `json:"statusQuery"`
Method string `json:"method,omitempty"`
MethodQuery string `json:"methodQuery,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
@@ -190,44 +192,6 @@ type Contract struct {
Content string `json:"content"`
}
func Summarize(entry *Entry) *BaseEntry {
return &BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Path: entry.Path,
Summary: entry.Summary,
StatusCode: entry.Status,
Method: entry.Method,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.IsOutgoing,
Latency: entry.ElapsedTime,
Rules: entry.Rules,
ContractStatus: entry.ContractStatus,
}
}
type DataUnmarshaler interface {
UnmarshalData(*Entry) error
}
func (bed *BaseEntry) UnmarshalData(entry *Entry) error {
bed.Protocol = entry.Protocol
bed.Id = entry.Id
bed.Path = entry.Path
bed.Summary = entry.Summary
bed.StatusCode = entry.Status
bed.Method = entry.Method
bed.Timestamp = entry.Timestamp
bed.Source = entry.Source
bed.Destination = entry.Destination
bed.IsOutgoing = entry.IsOutgoing
bed.Latency = entry.ElapsedTime
bed.ContractStatus = entry.ContractStatus
return nil
}
const (
TABLE string = "table"
BODY string = "body"

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/amqp/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect

View File

@@ -219,31 +219,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
summary := ""
switch request["method"] {
case basicMethodMap[40]:
summary = reqDetails["exchange"].(string)
case basicMethodMap[60]:
summary = reqDetails["exchange"].(string)
case exchangeMethodMap[10]:
summary = reqDetails["exchange"].(string)
case queueMethodMap[10]:
summary = reqDetails["queue"].(string)
case connectionMethodMap[10]:
summary = fmt.Sprintf(
"%s.%s",
strconv.Itoa(int(reqDetails["versionMajor"].(float64))),
strconv.Itoa(int(reqDetails["versionMinor"].(float64))),
)
case connectionMethodMap[50]:
summary = reqDetails["replyText"].(string)
case queueMethodMap[20]:
summary = reqDetails["queue"].(string)
case basicMethodMap[20]:
summary = reqDetails["queue"].(string)
}
request["url"] = summary
reqDetails["method"] = request["method"]
return &api.Entry{
Protocol: protocol,
@@ -260,17 +235,70 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),
Status: 0,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: 0,
Summary: summary,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
summary := ""
summaryQuery := ""
method := entry.Request["method"].(string)
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
switch method {
case basicMethodMap[40]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case basicMethodMap[60]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case exchangeMethodMap[10]:
summary = entry.Request["exchange"].(string)
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
case queueMethodMap[10]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case connectionMethodMap[10]:
versionMajor := int(entry.Request["versionMajor"].(float64))
versionMinor := int(entry.Request["versionMinor"].(float64))
summary = fmt.Sprintf(
"%s.%s",
strconv.Itoa(versionMajor),
strconv.Itoa(versionMinor),
)
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
case connectionMethodMap[50]:
summary = entry.Request["replyText"].(string)
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
case queueMethodMap[20]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
case basicMethodMap[20]:
summary = entry.Request["queue"].(string)
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
}
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Summary: summary,
SummaryQuery: summaryQuery,
Status: 0,
StatusQuery: "",
Method: method,
MethodQuery: methodQuery,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.Outgoing,
Latency: entry.ElapsedTime,
Rules: entry.Rules,
ContractStatus: entry.ContractStatus,
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{})

View File

@@ -21,14 +21,16 @@ import (
const (
binDir = "bin"
patternBin = "*_req.bin"
patternDissect = "*.json"
patternExpect = "*.json"
msgDissecting = "Dissecting:"
msgAnalyzing = "Analyzing:"
msgSummarizing = "Summarizing:"
msgRepresenting = "Representing:"
respSuffix = "_res.bin"
expectDir = "expect"
dissectDir = "dissect"
analyzeDir = "analyze"
summarizeDir = "summarize"
representDir = "represent"
testUpdate = "TEST_UPDATE"
)
@@ -186,7 +188,7 @@ func TestAnalyze(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
if err != nil {
log.Fatal(err)
}
@@ -230,6 +232,63 @@ func TestAnalyze(t *testing.T) {
}
}
func TestSummarize(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
expectDirSummarize := path.Join(expectDir, summarizeDir)
if testUpdateEnabled {
os.RemoveAll(expectDirSummarize)
err := os.MkdirAll(expectDirSummarize, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgSummarizing, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var entries []*api.Entry
err = json.Unmarshal(bytes, &entries)
assert.Nil(t, err)
var baseEntries []*api.BaseEntry
for _, entry := range entries {
baseEntry := dissector.Summarize(entry)
baseEntries = append(baseEntries, baseEntry)
}
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
marshaled, err := json.Marshal(baseEntries)
assert.Nil(t, err)
if testUpdateEnabled {
if len(baseEntries) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, entries, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestRepresent(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
@@ -243,7 +302,7 @@ func TestRepresent(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect

View File

@@ -231,7 +231,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["targetUri"] = reqDetails["url"]
reqDetails["path"] = path
reqDetails["pathSegments"] = strings.Split(path, "/")[1:]
reqDetails["summary"] = path
// Rearrange the maps for the querying
reqDetails["_headers"] = reqDetails["headers"]
@@ -248,17 +247,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails["_queryStringMerged"] = mapSliceMergeRepeatedKeys(reqDetails["_queryString"].([]interface{}))
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryStringMerged"].([]interface{}))
method := reqDetails["method"].(string)
statusCode := int(resDetails["status"].(float64))
if item.Protocol.Abbreviation == "gRPC" {
resDetails["statusText"] = grpcStatusCodes[statusCode]
}
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
reqDetails["url"] = path
request["url"] = path
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
@@ -280,17 +273,40 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
Method: method,
Status: statusCode,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Summary: path,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
HTTPPair: string(httpPair),
}
}
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
summary := entry.Request["path"].(string)
summaryQuery := fmt.Sprintf(`request.path == "%s"`, summary)
method := entry.Request["method"].(string)
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
status := int(entry.Response["status"].(float64))
statusQuery := fmt.Sprintf(`response.status == %d`, status)
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,
StatusQuery: statusQuery,
Method: method,
MethodQuery: methodQuery,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.Outgoing,
Latency: entry.ElapsedTime,
Rules: entry.Rules,
ContractStatus: entry.ContractStatus,
}
}
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
details, _ := json.Marshal([]api.TableData{
{

View File

@@ -21,14 +21,16 @@ import (
const (
binDir = "bin"
patternBin = "*_req.bin"
patternDissect = "*.json"
patternExpect = "*.json"
msgDissecting = "Dissecting:"
msgAnalyzing = "Analyzing:"
msgSummarizing = "Summarizing:"
msgRepresenting = "Representing:"
respSuffix = "_res.bin"
expectDir = "expect"
dissectDir = "dissect"
analyzeDir = "analyze"
summarizeDir = "summarize"
representDir = "represent"
testUpdate = "TEST_UPDATE"
)
@@ -188,7 +190,7 @@ func TestAnalyze(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
if err != nil {
log.Fatal(err)
}
@@ -232,6 +234,63 @@ func TestAnalyze(t *testing.T) {
}
}
func TestSummarize(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
expectDirSummarize := path.Join(expectDir, summarizeDir)
if testUpdateEnabled {
os.RemoveAll(expectDirSummarize)
err := os.MkdirAll(expectDirSummarize, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgSummarizing, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var entries []*api.Entry
err = json.Unmarshal(bytes, &entries)
assert.Nil(t, err)
var baseEntries []*api.BaseEntry
for _, entry := range entries {
baseEntry := dissector.Summarize(entry)
baseEntries = append(baseEntries, baseEntry)
}
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
marshaled, err := json.Marshal(baseEntries)
assert.Nil(t, err)
if testUpdateEnabled {
if len(baseEntries) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, entries, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestRepresent(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
@@ -245,7 +304,7 @@ func TestRepresent(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/kafka/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect

View File

@@ -61,83 +61,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
summary := ""
switch apiKey {
case Metadata:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
case ApiVersions:
summary = reqDetails["clientID"].(string)
case Produce:
_topics := reqDetails["payload"].(map[string]interface{})["topicData"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
case Fetch:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
case ListOffsets:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
case CreateTopics:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for _, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
case DeleteTopics:
if reqDetails["topicNames"] == nil {
break
}
topicNames := reqDetails["topicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
}
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
@@ -158,13 +82,127 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
Method: apiNames[apiKey],
Status: 0,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Summary: summary,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
status := 0
statusQuery := ""
apiKey := ApiKey(entry.Request["apiKey"].(float64))
method := apiNames[apiKey]
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
summary := ""
summaryQuery := ""
switch apiKey {
case Metadata:
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for i, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
case ApiVersions:
summary = entry.Request["clientID"].(string)
summaryQuery = fmt.Sprintf(`request.clientID == "%s"`, summary)
case Produce:
_topics := entry.Request["payload"].(map[string]interface{})["topicData"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for i, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
summaryQuery += fmt.Sprintf(`request.payload.topicData[%d].topic == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
case Fetch:
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for i, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].topic == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
case ListOffsets:
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for i, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
case CreateTopics:
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
if _topics == nil {
break
}
topics := _topics.([]interface{})
for i, topic := range topics {
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
case DeleteTopics:
if entry.Request["topicNames"] == nil {
break
}
topicNames := entry.Request["topicNames"].([]string)
for i, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
summaryQuery += fmt.Sprintf(`request.topicNames[%d] == "%s" and`, i, summary)
}
if len(summary) > 0 {
summary = summary[:len(summary)-2]
summaryQuery = summaryQuery[:len(summaryQuery)-4]
}
}
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,
StatusQuery: statusQuery,
Method: method,
MethodQuery: methodQuery,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.Outgoing,
Latency: entry.ElapsedTime,
Rules: entry.Rules,
ContractStatus: entry.ContractStatus,
}
}

View File

@@ -21,14 +21,16 @@ import (
const (
binDir = "bin"
patternBin = "*_req.bin"
patternDissect = "*.json"
patternExpect = "*.json"
msgDissecting = "Dissecting:"
msgAnalyzing = "Analyzing:"
msgSummarizing = "Summarizing:"
msgRepresenting = "Representing:"
respSuffix = "_res.bin"
expectDir = "expect"
dissectDir = "dissect"
analyzeDir = "analyze"
summarizeDir = "summarize"
representDir = "represent"
testUpdate = "TEST_UPDATE"
)
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
if err != nil {
log.Fatal(err)
}
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
}
}
func TestSummarize(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
expectDirSummarize := path.Join(expectDir, summarizeDir)
if testUpdateEnabled {
os.RemoveAll(expectDirSummarize)
err := os.MkdirAll(expectDirSummarize, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgSummarizing, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var entries []*api.Entry
err = json.Unmarshal(bytes, &entries)
assert.Nil(t, err)
var baseEntries []*api.BaseEntry
for _, entry := range entries {
baseEntry := dissector.Summarize(entry)
baseEntries = append(baseEntries, baseEntry)
}
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
marshaled, err := json.Marshal(baseEntries)
assert.Nil(t, err)
if testUpdateEnabled {
if len(baseEntries) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, entries, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestRepresent(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}

View File

@@ -13,4 +13,4 @@ test-pull-bin:
test-pull-expect:
@mkdir -p expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/redis/\* expect
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect

View File

@@ -65,17 +65,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
method := ""
if reqDetails["command"] != nil {
method = reqDetails["command"].(string)
}
summary := ""
if reqDetails["key"] != nil {
summary = reqDetails["key"].(string)
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if elapsedTime < 0 {
elapsedTime = 0
@@ -96,17 +85,50 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,
Method: method,
Status: 0,
Timestamp: item.Timestamp,
StartTime: item.Pair.Request.CaptureTime,
ElapsedTime: elapsedTime,
Summary: summary,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
status := 0
statusQuery := ""
method := ""
methodQuery := ""
if entry.Request["command"] != nil {
method = entry.Request["command"].(string)
methodQuery = fmt.Sprintf(`request.command == "%s"`, method)
}
summary := ""
summaryQuery := ""
if entry.Request["key"] != nil {
summary = entry.Request["key"].(string)
summaryQuery = fmt.Sprintf(`request.key == "%s"`, summary)
}
return &api.BaseEntry{
Id: entry.Id,
Protocol: entry.Protocol,
Summary: summary,
SummaryQuery: summaryQuery,
Status: status,
StatusQuery: statusQuery,
Method: method,
MethodQuery: methodQuery,
Timestamp: entry.Timestamp,
Source: entry.Source,
Destination: entry.Destination,
IsOutgoing: entry.Outgoing,
Latency: entry.ElapsedTime,
Rules: entry.Rules,
ContractStatus: entry.ContractStatus,
}
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{})

View File

@@ -22,14 +22,16 @@ import (
const (
binDir = "bin"
patternBin = "*_req.bin"
patternDissect = "*.json"
patternExpect = "*.json"
msgDissecting = "Dissecting:"
msgAnalyzing = "Analyzing:"
msgSummarizing = "Summarizing:"
msgRepresenting = "Representing:"
respSuffix = "_res.bin"
expectDir = "expect"
dissectDir = "dissect"
analyzeDir = "analyze"
summarizeDir = "summarize"
representDir = "represent"
testUpdate = "TEST_UPDATE"
)
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
if err != nil {
log.Fatal(err)
}
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
}
}
func TestSummarize(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
expectDirAnalyze := path.Join(expectDir, analyzeDir)
expectDirSummarize := path.Join(expectDir, summarizeDir)
if testUpdateEnabled {
os.RemoveAll(expectDirSummarize)
err := os.MkdirAll(expectDirSummarize, 0775)
assert.Nil(t, err)
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}
for _, _path := range paths {
fmt.Printf("%s %s\n", msgSummarizing, _path)
bytes, err := ioutil.ReadFile(_path)
assert.Nil(t, err)
var entries []*api.Entry
err = json.Unmarshal(bytes, &entries)
assert.Nil(t, err)
var baseEntries []*api.BaseEntry
for _, entry := range entries {
baseEntry := dissector.Summarize(entry)
baseEntries = append(baseEntries, baseEntry)
}
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
marshaled, err := json.Marshal(baseEntries)
assert.Nil(t, err)
if testUpdateEnabled {
if len(baseEntries) > 0 {
err = os.WriteFile(pathExpect, marshaled, 0644)
assert.Nil(t, err)
}
} else {
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
assert.Len(t, entries, 0)
} else {
expectedBytes, err := ioutil.ReadFile(pathExpect)
assert.Nil(t, err)
assert.JSONEq(t, string(expectedBytes), string(marshaled))
}
}
}
}
func TestRepresent(t *testing.T) {
_, testUpdateEnabled := os.LookupEnv(testUpdate)
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
}
dissector := NewDissector()
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
if err != nil {
log.Fatal(err)
}

View File

@@ -52,7 +52,6 @@ var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
var tls = flag.Bool("tls", false, "Enable TLS tapper")
@@ -190,7 +189,7 @@ func initializePacketSources() error {
}
var err error
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
return err
} else {
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
@@ -248,7 +247,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
tls := tlstapper.TlsTapper{}
tlsPerfBufferSize := os.Getpagesize() * 100
if err := tls.Init(tlsPerfBufferSize); err != nil {
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
tlstapper.LogError(err)
return
}
@@ -272,6 +271,5 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
OutputChannel: outputItems,
}
poller := tlstapper.NewTlsPoller(&tls, extension)
go poller.Poll(extension, emitter, options)
go tls.Poll(emitter, options)
}

View File

@@ -0,0 +1,83 @@
package source
import (
"fmt"
"runtime"
"github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
)
func newNetnsPacketSource(procfs string, pid string,
interfaceName string, behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %s - %w", pid, err)
return nil, err
}
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
return nil, err
}
return src, nil
}
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
done := make(chan *tcpPacketSource)
errors := make(chan error)
go func(done chan<- *tcpPacketSource) {
// Setting a netns should be done from a dedicated OS thread.
//
// goroutines are not really OS threads, we try to mimic the issue by
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %w", err)
errors <- err
return
}
if err := netns.Set(nsh); err != nil {
logger.Log.Errorf("Unable to set netns of pid %s - %w", pid, err)
errors <- err
return
}
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
errors <- err
return
}
if err := netns.Set(oldnetns); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %w", err)
errors <- err
return
}
done <- src
}(done)
select {
case err := <-errors:
return nil, err
case source := <-done:
return source, nil
}
}

View File

@@ -2,109 +2,46 @@ package source
import (
"fmt"
"runtime"
"strconv"
"strings"
"github.com/up9inc/mizu/shared/logger"
"github.com/vishvananda/netns"
v1 "k8s.io/api/core/v1"
)
const bpfFilterMaxPods = 150
const hostSourcePid = "0"
type PacketSourceManager struct {
sources []*tcpPacketSource
sources map[string]*tcpPacketSource
}
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
return &PacketSourceManager{
sources: sources,
}, nil
}
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return sources, err
sourceManager := &PacketSourceManager{
sources: map[string]*tcpPacketSource{
hostSourcePid: hostSource,
},
}
return append(sources, hostSource), nil
}
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if pids == "" {
return sources
}
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
sourceManager.UpdatePods(mtls, procfs, pods, interfaceName, behaviour)
return sourceManager, nil
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string
if filename == "" {
name = fmt.Sprintf("host-%v", interfaceName)
name = fmt.Sprintf("host-%s", interfaceName)
} else {
name = fmt.Sprintf("file-%v", filename)
name = fmt.Sprintf("file-%s", filename)
}
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
if err != nil {
return nil, err
}
@@ -112,90 +49,93 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil
}
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
result := make([]*tcpPacketSource, 0)
for _, pidstr := range pids {
pid, err := strconv.Atoi(pidstr)
if err != nil {
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
continue
}
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
if err != nil {
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
continue
}
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
if err != nil {
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
continue
}
result = append(result, src)
func (m *PacketSourceManager) UpdatePods(mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) {
if mtls {
m.updateMtlsPods(procfs, pods, interfaceName, behaviour)
}
return result
m.setBPFFilter(pods)
}
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) {
done := make(chan *tcpPacketSource)
errors := make(chan error)
relevantPids := m.getRelevantPids(procfs, pods)
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
go func(done chan<- *tcpPacketSource) {
// Setting a netns should be done from a dedicated OS thread.
//
// goroutines are not really OS threads, we try to mimic the issue by
// locking the OS thread to this goroutine
//
runtime.LockOSThread()
defer runtime.UnlockOSThread()
oldnetns, err := netns.Get()
if err != nil {
logger.Log.Errorf("Unable to get netns of current thread %v", err)
errors <- err
return
for pid, src := range m.sources {
if _, ok := relevantPids[pid]; !ok {
src.close()
delete(m.sources, pid)
}
}
if err := netns.Set(nsh); err != nil {
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err)
errors <- err
return
for pid := range relevantPids {
if _, ok := m.sources[pid]; !ok {
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
if err == nil {
m.sources[pid] = source
}
}
}
}
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName)
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]bool {
relevantPids := make(map[string]bool)
relevantPids[hostSourcePid] = true
if err != nil {
logger.Log.Errorf("Error listening to PID %v - %v", pid, err)
errors <- err
return
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
} else {
for _, pid := range envoyPids {
relevantPids[pid] = true
}
}
if err := netns.Set(oldnetns); err != nil {
logger.Log.Errorf("Unable to set back netns of current thread %v", err)
errors <- err
return
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
} else {
for _, pid := range linkerdPids {
relevantPids[pid] = true
}
}
done <- src
}(done)
return relevantPids
}
select {
case err := <-errors:
return nil, err
case source := <-done:
return source, nil
func buildBPFExpr(pods []v1.Pod) string {
hostsFilter := make([]string, 0)
for _, pod := range pods {
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
}
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
}
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
if len(pods) == 0 {
logger.Log.Info("No pods provided, skipping pcap bpf filter")
return
}
var expr string
if len(pods) > bpfFilterMaxPods {
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
expr = "port not 443"
} else {
expr = buildBPFExpr(pods)
}
logger.Log.Infof("Setting pcap bpf filter %s", expr)
for pid, src := range m.sources {
if err := src.setBPFFilter(expr); err != nil {
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
}
}
}

View File

@@ -98,6 +98,14 @@ func newTcpPacketSource(name, filename string, interfaceName string,
return result, nil
}
func (source *tcpPacketSource) String() string {
return source.name
}
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
return source.handle.SetBPFFilter(expr)
}
func (source *tcpPacketSource) close() {
if source.handle != nil {
source.handle.Close()

View File

@@ -1,4 +1,4 @@
FROM alpine:3
FROM alpine:3.14
RUN apk --no-cache update && apk --no-cache add clang llvm libbpf-dev go linux-headers

View File

@@ -10,7 +10,7 @@ Copyright (C) UP9 Inc.
#define FLAGS_IS_CLIENT_BIT (1 << 0)
#define FLAGS_IS_READ_BIT (1 << 1)
// The same struct can be found in Chunk.go
// The same struct can be found in chunk.go
//
// Be careful when editing, alignment and padding should be exactly the same in go/c.
//

View File

@@ -8,20 +8,20 @@ import (
"github.com/go-errors/errors"
)
const FLAGS_IS_CLIENT_BIT int32 = (1 << 0)
const FLAGS_IS_READ_BIT int32 = (1 << 1)
const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0)
const FLAGS_IS_READ_BIT uint32 = (1 << 1)
// The same struct can be found in maps.h
//
// Be careful when editing, alignment and padding should be exactly the same in go/c.
//
type tlsChunk struct {
Pid int32
Tgid int32
Len int32
Recorded int32
Fd int32
Flags int32
Pid uint32
Tgid uint32
Len uint32
Recorded uint32
Fd uint32
Flags uint32
Address [16]byte
Data [4096]byte
}
@@ -68,3 +68,7 @@ func (c *tlsChunk) isWrite() bool {
func (c *tlsChunk) getRecordedData() []byte {
return c.Data[:c.Recorded]
}
func (c *tlsChunk) isRequest() bool {
return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead())
}

View File

@@ -0,0 +1,102 @@
package tlstapper
import (
"fmt"
"io/ioutil"
"net"
"os"
"regexp"
"strconv"
"strings"
"github.com/go-errors/errors"
)
var socketInodeRegex = regexp.MustCompile(`socket:\[(\d+)\]`)
const (
SRC_ADDRESS_FILED_INDEX = 1
DST_ADDRESS_FILED_INDEX = 2
INODE_FILED_INDEX = 9
)
// This file helps to extract Ip and Port out of a Socket file descriptor.
//
// The equivalent bash commands are:
//
// > ls -l /proc/<pid>/fd/<fd>
// Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket
// > cat /proc/<pid>/net/tcp | grep <inode>
// Output a line per ipv4 socket, the 9th field is the inode of the socket
// The 1st and 2nd fields are the source and dest ip and ports in a Hex format
// 0100007F:50 is 127.0.0.1:80
func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) {
inode, err := getSocketInode(procfs, pid, fd)
if err != nil {
return nil, 0, err
}
tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid)
tcp, err := ioutil.ReadFile(tcppath)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
for _, line := range strings.Split(string(tcp), "\n") {
parts := strings.Fields(line)
if len(parts) < 10 {
continue
}
if inode == parts[INODE_FILED_INDEX] {
if src {
return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX])
} else {
return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX])
}
}
}
return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode)
}
func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) {
fdlinkPath := fmt.Sprintf("%s/%d/fd/%d", procfs, pid, fd)
fdlink, err := os.Readlink(fdlinkPath)
if err != nil {
return "", errors.Wrap(err, 0)
}
tokens := socketInodeRegex.FindStringSubmatch(fdlink)
if tokens == nil || len(tokens) < 1 {
return "", errors.Errorf("socket inode not found [pid: %d] [sockfd: %d] [link: %s]", pid, fd, fdlink)
}
return tokens[1], nil
}
// Format looks like 0100007F:50 for 127.0.0.1:80
//
func parseHexAddress(addr string) (net.IP, uint16, error) {
addrParts := strings.Split(addr, ":")
port, err := strconv.ParseUint(addrParts[1], 16, 16)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
ip, err := strconv.ParseUint(addrParts[0], 16, 32)
if err != nil {
return nil, 0, errors.Wrap(err, 0)
}
return net.IP{uint8(ip), uint8(ip >> 8), uint8(ip >> 16), uint8(ip >> 24)}, uint16(port), nil
}

View File

@@ -2,7 +2,6 @@ package tlstapper
import (
"debug/elf"
"fmt"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
@@ -47,7 +46,7 @@ func findBaseAddress(sslElf *elf.File, sslLibraryPath string) (uint64, error) {
}
}
return 0, errors.New(fmt.Sprintf("Program header not found in %v", sslLibraryPath))
return 0, errors.Errorf("Program header not found in %v", sslLibraryPath)
}
func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) {

View File

@@ -2,43 +2,64 @@ package tlstapper
import (
"bufio"
"bytes"
"fmt"
"net"
"encoding/binary"
"encoding/hex"
"os"
"strconv"
"strings"
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
const UNKNOWN_PORT uint16 = 80
const UNKNOWN_HOST string = "127.0.0.1"
type tlsPoller struct {
tls *TlsTapper
readers map[string]*tlsReader
closedReaders chan string
reqResMatcher api.RequestResponseMatcher
chunksReader *perf.Reader
extension *api.Extension
procfs string
}
func NewTlsPoller(tls *TlsTapper, extension *api.Extension) *tlsPoller {
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
return &tlsPoller{
tls: tls,
readers: make(map[string]*tlsReader),
closedReaders: make(chan string, 100),
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
extension: extension,
chunksReader: nil,
procfs: procfs,
}
}
func (p *tlsPoller) Poll(extension *api.Extension,
emitter api.Emitter, options *api.TrafficFilteringOptions) {
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
var err error
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (p *tlsPoller) close() error {
return p.chunksReader.Close()
}
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
chunks := make(chan *tlsChunk)
go p.tls.pollPerf(chunks)
go p.pollChunksPerfBuffer(chunks)
for {
select {
@@ -47,7 +68,7 @@ func (p *tlsPoller) Poll(extension *api.Extension,
return
}
if err := p.handleTlsChunk(chunk, extension, emitter, options); err != nil {
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
LogError(err)
}
case key := <-p.closedReaders:
@@ -56,6 +77,41 @@ func (p *tlsPoller) Poll(extension *api.Extension,
}
}
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
for {
record, err := p.chunksReader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
}
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
emitter api.Emitter, options *api.TrafficFilteringOptions) error {
ip, port, err := chunk.getAddress()
@@ -75,7 +131,7 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
reader.chunks <- chunk
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
logTls(chunk, ip, port)
p.logTls(chunk, ip, port)
}
return nil
@@ -92,10 +148,9 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
},
}
isRequest := (chunk.isClient() && chunk.isWrite()) || (chunk.isServer() && chunk.isRead())
tcpid := buildTcpId(isRequest, ip, port)
tcpid := p.buildTcpId(chunk, ip, port)
go dissect(extension, reader, isRequest, &tcpid, emitter, options, p.reqResMatcher)
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
return reader
}
@@ -120,27 +175,36 @@ func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
}
func buildTcpId(isRequest bool, ip net.IP, port uint16) api.TcpID {
if isRequest {
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
if err != nil {
// May happen if the socket already closed, very likely to happen for localhost
//
myIp = api.UnknownIp
myPort = api.UnknownPort
}
if chunk.isRequest() {
return api.TcpID{
SrcIP: UNKNOWN_HOST,
SrcIP: myIp.String(),
DstIP: ip.String(),
SrcPort: strconv.Itoa(int(UNKNOWN_PORT)),
DstPort: strconv.FormatInt(int64(port), 10),
SrcPort: strconv.FormatUint(uint64(myPort), 10),
DstPort: strconv.FormatUint(uint64(port), 10),
Ident: "",
}
} else {
return api.TcpID{
SrcIP: ip.String(),
DstIP: UNKNOWN_HOST,
SrcPort: strconv.FormatInt(int64(port), 10),
DstPort: strconv.Itoa(int(UNKNOWN_PORT)),
DstIP: myIp.String(),
SrcPort: strconv.FormatUint(uint64(port), 10),
DstPort: strconv.FormatUint(uint64(myPort), 10),
Ident: "",
}
}
}
func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
var flagsStr string
if chunk.isClient() {
@@ -155,8 +219,13 @@ func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
flagsStr += "W"
}
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (recorded %v out of %v) - %v - %v",
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v) - %v - %v",
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
srcIp, srcPort, dstIp, dstPort,
chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
}

View File

@@ -132,8 +132,22 @@ func extractCgroup(lines []string) string {
return ""
}
// cgroup in the /proc/<pid>/cgroup may look something like
//
// /system.slice/docker-<ID>.scope
// /system.slice/containerd-<ID>.scope
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
//
// This function extract the <ID> out of the cgroup path, the <ID> should match
// the "Container ID:" field when running kubectl describe pod <POD>
//
func normalizeCgroup(cgrouppath string) string {
basename := strings.TrimSpace(path.Base(cgrouppath))
if strings.Contains(basename, "-") {
basename = basename[strings.Index(basename, "-") + 1:]
}
if strings.Contains(basename, ".") {
return strings.TrimSuffix(basename, filepath.Ext(basename))

View File

@@ -1,13 +1,10 @@
package tlstapper
import (
"bytes"
"encoding/binary"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/rlimit"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
@@ -16,10 +13,10 @@ type TlsTapper struct {
bpfObjects tlsTapperObjects
syscallHooks syscallHooks
sslHooksStructs []sslHooks
reader *perf.Reader
poller *tlsPoller
}
func (t *TlsTapper) Init(bufferSize int) error {
func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
if err := setupRLimit(); err != nil {
@@ -27,55 +24,23 @@ func (t *TlsTapper) Init(bufferSize int) error {
}
t.bpfObjects = tlsTapperObjects{}
if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil {
return errors.Wrap(err, 0)
}
t.syscallHooks = syscallHooks{}
if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil {
return err
}
t.sslHooksStructs = make([]sslHooks, 0)
return t.initChunksReader(bufferSize)
t.poller = newTlsPoller(t, extension, procfs)
return t.poller.init(&t.bpfObjects, bufferSize)
}
func (t *TlsTapper) pollPerf(chunks chan<- *tlsChunk) {
logger.Log.Infof("Start polling for tls events")
for {
record, err := t.reader.Read()
if err != nil {
close(chunks)
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var chunk tlsChunk
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
LogError(errors.Errorf("Error parsing chunk %v", err))
continue
}
chunks <- &chunk
}
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
t.poller.poll(emitter, options)
}
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
@@ -118,7 +83,7 @@ func (t *TlsTapper) Close() []error {
errors = append(errors, sslHooks.close()...)
}
if err := t.reader.Close(); err != nil {
if err := t.poller.close(); err != nil {
errors = append(errors, err)
}
@@ -135,18 +100,6 @@ func setupRLimit() error {
return nil
}
func (t *TlsTapper) initChunksReader(bufferSize int) error {
var err error
t.reader, err = perf.NewReader(t.bpfObjects.ChunksBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)

View File

@@ -1,5 +1,4 @@
// Code generated by bpf2go; DO NOT EDIT.
//go:build arm64be || armbe || mips || mips64 || mips64p32 || ppc64 || s390 || s390x || sparc || sparc64
// +build arm64be armbe mips mips64 mips64p32 ppc64 s390 s390x sparc sparc64
package tlstapper

Binary file not shown.

View File

@@ -1,5 +1,4 @@
// Code generated by bpf2go; DO NOT EDIT.
//go:build 386 || amd64 || amd64p32 || arm || arm64 || mips64le || mips64p32le || mipsle || ppc64le || riscv64
// +build 386 amd64 amd64p32 arm arm64 mips64le mips64p32le mipsle ppc64le riscv64
package tlstapper

Binary file not shown.

View File

@@ -119,7 +119,7 @@ export const EntryDetailed = () => {
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>}
{entryData && <EntrySummary entry={entryData.data}/>}
{entryData && <EntrySummary entry={entryData.base}/>}
<>
{entryData && <EntryViewer
representation={entryData.representation}

View File

@@ -25,9 +25,12 @@ interface TCPInterface {
interface Entry {
proto: ProtocolInterface,
method?: string,
methodQuery?: string,
summary: string,
summaryQuery: string,
id: number,
status?: number;
statusQuery?: string;
timestamp: Date;
src: TCPInterface,
dst: TCPInterface,
@@ -152,10 +155,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
horizontal={false}
/> : null}
{isStatusCodeEnabled && <div>
<StatusCode statusCode={entry.status}/>
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
</div>}
<div className={styles.endpointServiceContainer} style={{paddingLeft: endpointServiceContainer}}>
<Summary method={entry.method} summary={entry.summary}/>
<Summary method={entry.method} methodQuery={entry.methodQuery} summary={entry.summary} summaryQuery={entry.summaryQuery}/>
<div className={styles.resolvedName}>
<Queryable
query={`src.name == "${entry.src.name}"`}

View File

@@ -10,14 +10,15 @@ export enum StatusCodeClassification {
interface EntryProps {
statusCode: number
statusQuery: string
}
const StatusCode: React.FC<EntryProps> = ({statusCode}) => {
const StatusCode: React.FC<EntryProps> = ({statusCode, statusQuery}) => {
const classification = getClassification(statusCode)
return <Queryable
query={`response.status == ${statusCode}`}
query={statusQuery}
displayIconOnMouseOver={true}
flipped={true}
iconStyle={{marginTop: "40px", paddingLeft: "10px"}}

View File

@@ -5,14 +5,16 @@ import Queryable from "./Queryable";
interface SummaryProps {
method: string
methodQuery: string
summary: string
summaryQuery: string
}
export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
export const Summary: React.FC<SummaryProps> = ({method, methodQuery, summary, summaryQuery}) => {
return <div className={styles.container}>
{method && <Queryable
query={`method == "${method}"`}
query={methodQuery}
className={`${miscStyles.protocol} ${miscStyles.method}`}
displayIconOnMouseOver={true}
style={{whiteSpace: "nowrap"}}
@@ -24,7 +26,7 @@ export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
</span>
</Queryable>}
{summary && <Queryable
query={`summary == "${summary}"`}
query={summaryQuery}
displayIconOnMouseOver={true}
flipped={true}
iconStyle={{zIndex:"5",position:"relative",right:"14px"}}

View File

@@ -52,12 +52,12 @@ export default class Api {
}
getEntry = async (id, query) => {
const response = await this.client.get(`/entries/${id}?query=${query}`);
const response = await this.client.get(`/entries/${id}?query=${encodeURIComponent(query)}`);
return response.data;
}
fetchEntries = async (leftOff, direction, query, limit, timeoutMs) => {
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${query}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${encodeURIComponent(query)}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
console.error(thrown.message);
return {};
});