mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-18 03:50:06 +00:00
Compare commits
11 Commits
29.0-dev16
...
30.0-dev2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d844d6eb04 | ||
|
|
6979441422 | ||
|
|
9ec8347c6c | ||
|
|
617fb89ca5 | ||
|
|
1cbd9cb199 | ||
|
|
23c1b66855 | ||
|
|
f5fa9ff270 | ||
|
|
4159938cea | ||
|
|
5614e153f3 | ||
|
|
5e90d67b0e | ||
|
|
dd430c31d5 |
44
.github/workflows/build-custom-branch.yml
vendored
Normal file
44
.github/workflows/build-custom-branch.yml
vendored
Normal file
@@ -0,0 +1,44 @@
|
||||
name: Build Custom Branch
|
||||
|
||||
on: push
|
||||
|
||||
concurrency:
|
||||
group: custom-branch-build-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Push custom branch image to GCR
|
||||
runs-on: ubuntu-latest
|
||||
if: ${{ contains(github.event.head_commit.message, '#build_and_publish_custom_image') }}
|
||||
|
||||
steps:
|
||||
- name: Check out the repo
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- id: 'auth'
|
||||
uses: 'google-github-actions/auth@v0'
|
||||
with:
|
||||
credentials_json: '${{ secrets.GCR_JSON_KEY }}'
|
||||
|
||||
- name: 'Set up Cloud SDK'
|
||||
uses: 'google-github-actions/setup-gcloud@v0'
|
||||
|
||||
- name: Get base image name
|
||||
shell: bash
|
||||
run: echo "##[set-output name=image;]$(echo gcr.io/up9-docker-hub/mizu/${GITHUB_REF#refs/heads/})"
|
||||
id: base_image_step
|
||||
|
||||
- name: Login to GCR
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
registry: gcr.io
|
||||
username: _json_key
|
||||
password: ${{ secrets.GCR_JSON_KEY }}
|
||||
|
||||
- name: Build and push
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: ${{ steps.base_image_step.outputs.image }}:latest
|
||||
35
.github/workflows/build.yml
vendored
35
.github/workflows/build.yml
vendored
@@ -15,15 +15,23 @@ jobs:
|
||||
name: CLI executable build
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Check modified files
|
||||
id: modified_files
|
||||
run: devops/check_modified_files.sh cli/
|
||||
|
||||
- name: Set up Go 1.17
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Build CLI
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
run: make cli
|
||||
|
||||
build-agent:
|
||||
@@ -32,6 +40,23 @@ jobs:
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Check modified files
|
||||
id: modified_files
|
||||
run: devops/check_modified_files.sh agent/ shared/ tap/ ui/ Dockerfile
|
||||
|
||||
- name: Build Agent
|
||||
run: make agent-docker
|
||||
- name: Set up Docker Buildx
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Build
|
||||
uses: docker/build-push-action@v2
|
||||
if: steps.modified_files.outputs.matched == 'true'
|
||||
with:
|
||||
context: .
|
||||
push: false
|
||||
tags: up9inc/mizu:devlatest
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
4
.github/workflows/release.yml
vendored
4
.github/workflows/release.yml
vendored
@@ -59,6 +59,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
flavor: |
|
||||
latest=auto
|
||||
prefix=
|
||||
@@ -145,6 +146,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
flavor: |
|
||||
latest=auto
|
||||
prefix=
|
||||
@@ -208,7 +210,7 @@ jobs:
|
||||
tags: |
|
||||
type=raw,${{ steps.versioning.outputs.version }}
|
||||
type=raw,value=latest,enable=${{ steps.condval.outputs.value == 'stable' }}
|
||||
|
||||
type=raw,value=dev-latest,enable=${{ steps.condval.outputs.value == 'dev' }}
|
||||
- name: Login to Docker Hub
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
|
||||
23
.github/workflows/test.yml
vendored
23
.github/workflows/test.yml
vendored
@@ -19,14 +19,16 @@ jobs:
|
||||
name: Unit Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '^1.17'
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Install libpcap
|
||||
shell: bash
|
||||
run: |
|
||||
@@ -40,16 +42,31 @@ jobs:
|
||||
- name: 'Set up Cloud SDK'
|
||||
uses: 'google-github-actions/setup-gcloud@v0'
|
||||
|
||||
- name: Check CLI modified files
|
||||
id: cli_modified_files
|
||||
run: devops/check_modified_files.sh cli/
|
||||
|
||||
- name: CLI Test
|
||||
if: github.event_name == 'push' || steps.cli_modified_files.outputs.matched == 'true'
|
||||
run: make test-cli
|
||||
|
||||
- name: Check Agent modified files
|
||||
id: agent_modified_files
|
||||
run: devops/check_modified_files.sh agent/
|
||||
|
||||
- name: Agent Test
|
||||
if: github.event_name == 'push' || steps.agent_modified_files.outputs.matched == 'true'
|
||||
run: make test-agent
|
||||
|
||||
- name: Shared Test
|
||||
run: make test-shared
|
||||
|
||||
- name: Check extensions modified files
|
||||
id: ext_modified_files
|
||||
run: devops/check_modified_files.sh tap/extensions/
|
||||
|
||||
- name: Extensions Test
|
||||
if: github.event_name == 'push' || steps.ext_modified_files.outputs.matched == 'true'
|
||||
run: make test-extensions
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
|
||||
@@ -78,8 +78,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \
|
||||
-X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent .
|
||||
|
||||
# Download Basenine executable, verify the sha1sum
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.5.4/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}
|
||||
ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256
|
||||
RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256
|
||||
RUN chmod +x ./basenine_linux_${GOARCH}
|
||||
RUN mv ./basenine_linux_${GOARCH} ./basenine
|
||||
|
||||
@@ -65,14 +65,14 @@ export function checkThatAllEntriesShown() {
|
||||
}
|
||||
|
||||
export function checkFilterByMethod(funcDict) {
|
||||
const {protocol, method, summary, hugeMizu} = funcDict;
|
||||
const summaryDict = getSummeryDict(summary);
|
||||
const methodDict = getMethodDict(method);
|
||||
const {protocol, method, methodQuery, summary, summaryQuery} = funcDict;
|
||||
const summaryDict = getSummaryDict(summary, summaryQuery);
|
||||
const methodDict = getMethodDict(method, methodQuery);
|
||||
const protocolDict = getProtocolDict(protocol.name, protocol.text);
|
||||
|
||||
it(`Testing the method: ${method}`, function () {
|
||||
// applying filter
|
||||
cy.get('.w-tc-editor-text').clear().type(`method == "${method}"`);
|
||||
cy.get('.w-tc-editor-text').clear().type(methodQuery);
|
||||
cy.get('[type="submit"]').click();
|
||||
cy.get('.w-tc-editor').should('have.attr', 'style').and('include', Cypress.env('greenFilterColor'));
|
||||
|
||||
@@ -121,7 +121,7 @@ function resizeIfNeeded(entriesLen) {
|
||||
function deepCheck(generalDict, protocolDict, methodDict, entry) {
|
||||
const entryNum = getEntryNumById(entry.id);
|
||||
const {summary, value} = generalDict;
|
||||
const summaryDict = getSummeryDict(summary);
|
||||
const summaryDict = getSummaryDict(summary);
|
||||
|
||||
leftOnHoverCheck(entryNum, methodDict.pathLeft, methodDict.expectedOnHover);
|
||||
leftOnHoverCheck(entryNum, protocolDict.pathLeft, protocolDict.expectedOnHover);
|
||||
@@ -149,13 +149,13 @@ function deepCheck(generalDict, protocolDict, methodDict, entry) {
|
||||
}
|
||||
}
|
||||
|
||||
function getSummeryDict(summary) {
|
||||
if (summary) {
|
||||
function getSummaryDict(value, query) {
|
||||
if (value) {
|
||||
return {
|
||||
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
expectedText: summary,
|
||||
expectedOnHover: `summary == "${summary}"`
|
||||
expectedText: value,
|
||||
expectedOnHover: query
|
||||
};
|
||||
}
|
||||
else {
|
||||
@@ -163,12 +163,12 @@ function getSummeryDict(summary) {
|
||||
}
|
||||
}
|
||||
|
||||
function getMethodDict(method) {
|
||||
function getMethodDict(value, query) {
|
||||
return {
|
||||
pathLeft: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
pathRight: '> :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
expectedText: method,
|
||||
expectedOnHover: `method == "${method}"`
|
||||
expectedText: value,
|
||||
expectedOnHover: query
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -9,41 +9,53 @@ const rabbitProtocolDetails = {name: 'AMQP', text: 'Advanced Message Queuing Pro
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'exchange declare',
|
||||
methodQuery: 'request.method == "exchange declare"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.exchange == "exchange"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'queue declare',
|
||||
methodQuery: 'request.method == "queue declare"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'queue bind',
|
||||
methodQuery: 'request.method == "queue bind"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic publish',
|
||||
methodQuery: 'request.method == "basic publish"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.exchange == "exchange"',
|
||||
value: {tab: valueTabs.request, regex: /^message$/mg}
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic consume',
|
||||
methodQuery: 'request.method == "basic consume"',
|
||||
summary: 'queue',
|
||||
summaryQuery: 'request.queue == "queue"',
|
||||
value: null
|
||||
});
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: rabbitProtocolDetails,
|
||||
method: 'basic deliver',
|
||||
methodQuery: 'request.method == "basic deliver"',
|
||||
summary: 'exchange',
|
||||
summaryQuery: 'request.queue == "exchange"',
|
||||
value: {tab: valueTabs.request, regex: /^message$/mg}
|
||||
});
|
||||
|
||||
@@ -9,34 +9,44 @@ const redisProtocolDetails = {name: 'redis', text: 'Redis Serialization Protocol
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'PING',
|
||||
methodQuery: 'request.command == "PING"',
|
||||
summary: null,
|
||||
summaryQuery: '',
|
||||
value: null
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'SET',
|
||||
methodQuery: 'request.command == "SET"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.request, regex: /^\[value, keepttl]$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'EXISTS',
|
||||
methodQuery: 'request.command == "EXISTS"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^1$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'GET',
|
||||
methodQuery: 'request.command == "GET"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^value$/mg}
|
||||
})
|
||||
|
||||
checkFilterByMethod({
|
||||
protocol: redisProtocolDetails,
|
||||
method: 'DEL',
|
||||
methodQuery: 'request.command == "DEL"',
|
||||
summary: 'key',
|
||||
summaryQuery: 'request.key == "key"',
|
||||
value: {tab: valueTabs.response, regex: /^1$|^0$/mg}
|
||||
})
|
||||
|
||||
@@ -113,7 +113,7 @@ if (Cypress.env('shouldCheckSrcAndDest')) {
|
||||
}
|
||||
|
||||
checkFilter({
|
||||
name: 'method == "GET"',
|
||||
name: 'request.method == "GET"',
|
||||
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
leftSideExpectedText: 'GET',
|
||||
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(1) > :nth-child(2)',
|
||||
@@ -122,7 +122,7 @@ checkFilter({
|
||||
});
|
||||
|
||||
checkFilter({
|
||||
name: 'summary == "/get"',
|
||||
name: 'request.path == "/get"',
|
||||
leftSidePath: '> :nth-child(3) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
leftSideExpectedText: '/get',
|
||||
rightSidePath: '> :nth-child(2) > :nth-child(2) > :nth-child(1) > :nth-child(2) > :nth-child(2)',
|
||||
@@ -139,7 +139,7 @@ checkFilter({
|
||||
applyByEnter: false
|
||||
});
|
||||
|
||||
checkFilterNoResults('method == "POST"');
|
||||
checkFilterNoResults('request.method == "POST"');
|
||||
|
||||
function checkFilterNoResults(filterName) {
|
||||
it(`checking the filter: ${filterName}. Expecting no results`, function () {
|
||||
|
||||
@@ -22,7 +22,7 @@ require (
|
||||
github.com/ory/kratos-client-go v0.8.2-alpha.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
|
||||
10
agent/go.sum
10
agent/go.sum
@@ -853,14 +853,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
|
||||
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1 h1:0XN8s3HtwUBr9hbWRAFulFMsu1f2cabfJbwpz/sOoLA=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220220204122-0ef8cb24fab1/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e h1:nv/A/AeF8PcU91aHAj6o2cU8fl/46v0ZLj7wgIKjv+o=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220301135911-d2111357b14e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500 h1:T1QHxt65NMete/GobVSvcHnwZAQibvahhrMTCgtnSS4=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302073458-c32e0adf1500/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0 h1:mSqZuJJV4UZyaAoC8x7/AO7DLidlXepFyU18Vm3rFiA=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0=
|
||||
github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
|
||||
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg=
|
||||
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
|
||||
|
||||
@@ -17,6 +17,12 @@ import (
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var extensionsMap map[string]*tapApi.Extension // global
|
||||
|
||||
func InitExtensionsMap(ref map[string]*tapApi.Extension) {
|
||||
extensionsMap = ref
|
||||
}
|
||||
|
||||
type EventHandlers interface {
|
||||
WebSocketConnect(socketId int, isTapper bool)
|
||||
WebSocketDisconnect(socketId int, isTapper bool)
|
||||
@@ -165,7 +171,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
if params.EnableFullEntries {
|
||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||
} else {
|
||||
base := tapApi.Summarize(entry)
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||
}
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ func LoadExtensions() {
|
||||
})
|
||||
|
||||
controllers.InitExtensionsMap(ExtensionsMap)
|
||||
api.InitExtensionsMap(ExtensionsMap)
|
||||
}
|
||||
|
||||
func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel logging.Level, insertionFilter string) {
|
||||
|
||||
@@ -77,7 +77,8 @@ func GetEntries(c *gin.Context) {
|
||||
return // exit
|
||||
}
|
||||
|
||||
base := tapApi.Summarize(entry)
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
|
||||
dataSlice = append(dataSlice, base)
|
||||
}
|
||||
@@ -123,6 +124,7 @@ func GetEntry(c *gin.Context) {
|
||||
}
|
||||
|
||||
extension := extensionsMap[entry.Protocol.Name]
|
||||
base := extension.Dissector.Summarize(entry)
|
||||
representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response)
|
||||
|
||||
var rules []map[string]interface{}
|
||||
@@ -142,6 +144,7 @@ func GetEntry(c *gin.Context) {
|
||||
Representation: string(representation),
|
||||
BodySize: bodySize,
|
||||
Data: entry,
|
||||
Base: base,
|
||||
Rules: rules,
|
||||
IsRulesEnabled: isRulesEnabled,
|
||||
})
|
||||
|
||||
@@ -80,11 +80,7 @@ type httpEntry struct {
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
Request map[string]interface{} `json:"request"`
|
||||
Response map[string]interface{} `json:"response"`
|
||||
Summary string `json:"summary"`
|
||||
Method string `json:"method"`
|
||||
Status int `json:"status"`
|
||||
ElapsedTime int64 `json:"elapsedTime"`
|
||||
Path string `json:"path"`
|
||||
}
|
||||
|
||||
func (client *client) PushEntry(entry *api.Entry) {
|
||||
@@ -103,11 +99,7 @@ func (client *client) PushEntry(entry *api.Entry) {
|
||||
CreatedAt: entry.StartTime,
|
||||
Request: entry.Request,
|
||||
Response: entry.Response,
|
||||
Summary: entry.Summary,
|
||||
Method: entry.Method,
|
||||
Status: entry.Status,
|
||||
ElapsedTime: entry.ElapsedTime,
|
||||
Path: entry.Path,
|
||||
}
|
||||
|
||||
entryJson, err := json.Marshal(entryToPush)
|
||||
|
||||
@@ -12,10 +12,6 @@ import (
|
||||
"github.com/up9inc/mizu/tap"
|
||||
)
|
||||
|
||||
func GetEntry(r *tapApi.Entry, v tapApi.DataUnmarshaler) error {
|
||||
return v.UnmarshalData(r)
|
||||
}
|
||||
|
||||
type TapConfig struct {
|
||||
TappedNamespaces map[string]bool `json:"tappedNamespaces"`
|
||||
}
|
||||
|
||||
@@ -4,11 +4,10 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared/kubernetes"
|
||||
@@ -59,7 +58,7 @@ func (provider *Provider) TestConnection() error {
|
||||
|
||||
func (provider *Provider) isReachable() (bool, error) {
|
||||
echoUrl := fmt.Sprintf("%s/echo", provider.url)
|
||||
if _, err := provider.get(echoUrl); err != nil {
|
||||
if _, err := utils.Get(echoUrl, provider.client); err != nil {
|
||||
return false, err
|
||||
} else {
|
||||
return true, nil
|
||||
@@ -72,7 +71,7 @@ func (provider *Provider) ReportTapperStatus(tapperStatus shared.TapperStatus) e
|
||||
if jsonValue, err := json.Marshal(tapperStatus); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapper status %w", err)
|
||||
} else {
|
||||
if _, err := provider.post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := utils.Post(tapperStatusUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about tapper status: %v", tapperStatus)
|
||||
@@ -89,7 +88,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
if jsonValue, err := json.Marshal(podInfos); err != nil {
|
||||
return fmt.Errorf("failed Marshal the tapped pods %w", err)
|
||||
} else {
|
||||
if _, err := provider.post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
if _, err := utils.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue), provider.client); err != nil {
|
||||
return fmt.Errorf("failed sending to API server the tapped pods %w", err)
|
||||
} else {
|
||||
logger.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
@@ -101,7 +100,7 @@ func (provider *Provider) ReportTappedPods(pods []core.Pod) error {
|
||||
func (provider *Provider) GetGeneralStats() (map[string]interface{}, error) {
|
||||
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
|
||||
|
||||
response, requestErr := provider.get(generalStatsUrl)
|
||||
response, requestErr := utils.Get(generalStatsUrl, provider.client)
|
||||
if requestErr != nil {
|
||||
return nil, fmt.Errorf("failed to get general stats for telemetry, err: %w", requestErr)
|
||||
}
|
||||
@@ -126,7 +125,7 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
Method: http.MethodGet,
|
||||
URL: versionUrl,
|
||||
}
|
||||
statusResp, err := provider.do(req)
|
||||
statusResp, err := utils.Do(req, provider.client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -139,40 +138,3 @@ func (provider *Provider) GetVersion() (string, error) {
|
||||
|
||||
return versionResponse.Ver, nil
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) get(url string) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Get(url))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) post(url, contentType string, body io.Reader) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Post(url, contentType, body))
|
||||
}
|
||||
|
||||
// When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func (provider *Provider) do(req *http.Request) (*http.Response, error) {
|
||||
return provider.checkError(provider.client.Do(req))
|
||||
}
|
||||
|
||||
func (provider *Provider) checkError(response *http.Response, errInOperation error) (*http.Response, error) {
|
||||
if (errInOperation != nil) {
|
||||
return response, errInOperation
|
||||
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
|
||||
} else if response.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
response.Body.Close()
|
||||
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
if err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
errorMsg := strings.ReplaceAll((string(body)), "\n", ";")
|
||||
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
42
cli/bucket/provider.go
Normal file
42
cli/bucket/provider.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package bucket
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/utils"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Provider struct {
|
||||
url string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
const DefaultTimeout = 2 * time.Second
|
||||
|
||||
func NewProvider(url string, timeout time.Duration) *Provider {
|
||||
return &Provider{
|
||||
url: url,
|
||||
client: &http.Client{
|
||||
Timeout: timeout,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (provider *Provider) GetInstallTemplate(templateName string) (string, error) {
|
||||
url := fmt.Sprintf("%s/%v", provider.url, templateName)
|
||||
response, err := utils.Get(url, provider.client)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
defer response.Body.Close()
|
||||
|
||||
installTemplate, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(installTemplate), nil
|
||||
}
|
||||
@@ -36,8 +36,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
provider := apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(); err != nil {
|
||||
logger.Log.Debugf("Couldn't connect using proxy, stopping proxy and trying to create port-forward")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
logger.Log.Debugf("Error occurred while stopping proxy %v", errormessage.FormatError(err))
|
||||
@@ -51,8 +51,8 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
return
|
||||
}
|
||||
|
||||
apiProvider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := apiProvider.TestConnection(); err != nil {
|
||||
provider = apiserver.NewProvider(GetApiServerUrl(port), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
||||
if err := provider.TestConnection(); err != nil {
|
||||
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
||||
cancel()
|
||||
return
|
||||
|
||||
@@ -3,7 +3,6 @@ package cmd
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/telemetry"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var installCmd = &cobra.Command{
|
||||
@@ -11,14 +10,7 @@ var installCmd = &cobra.Command{
|
||||
Short: "Installs mizu components",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
go telemetry.ReportRun("install", nil)
|
||||
logger.Log.Infof("This command has been deprecated, please use helm as described below.\n\n")
|
||||
|
||||
logger.Log.Infof("To install stable build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm --namespace=mizu --create-namespace\n\n")
|
||||
|
||||
logger.Log.Infof("To install development build of Mizu on your cluster using helm, run the following command:")
|
||||
logger.Log.Infof(" helm install mizu up9mizu --repo https://static.up9.com/mizu/helm-develop --namespace=mizu --create-namespace\n")
|
||||
|
||||
runMizuInstall()
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
19
cli/cmd/installRunner.go
Normal file
19
cli/cmd/installRunner.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/bucket"
|
||||
"github.com/up9inc/mizu/cli/config"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func runMizuInstall() {
|
||||
bucketProvider := bucket.NewProvider(config.Config.Install.TemplateUrl, bucket.DefaultTimeout)
|
||||
installTemplate, err := bucketProvider.GetInstallTemplate(config.Config.Install.TemplateName)
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Failed getting install template, err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Print(installTemplate)
|
||||
}
|
||||
@@ -23,6 +23,7 @@ const (
|
||||
type ConfigStruct struct {
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Check configStructs.CheckConfig `yaml:"check"`
|
||||
Install configStructs.InstallConfig `yaml:"install"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
Logs configStructs.LogsConfig `yaml:"logs"`
|
||||
|
||||
6
cli/config/configStructs/installConfig.go
Normal file
6
cli/config/configStructs/installConfig.go
Normal file
@@ -0,0 +1,6 @@
|
||||
package configStructs
|
||||
|
||||
type InstallConfig struct {
|
||||
TemplateUrl string `yaml:"template-url" default:"https://storage.googleapis.com/static.up9.io/mizu/helm-template"`
|
||||
TemplateName string `yaml:"template-name" default:"helm-template.yaml"`
|
||||
}
|
||||
@@ -11,7 +11,7 @@ require (
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
github.com/up9inc/mizu/tap/api v0.0.0
|
||||
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
|
||||
@@ -35,6 +35,7 @@ require (
|
||||
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
|
||||
github.com/alecthomas/participle/v2 v2.0.0-alpha7 // indirect
|
||||
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 // indirect
|
||||
github.com/clbanning/mxj/v2 v2.5.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dlclark/regexp2 v1.4.0 // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
|
||||
@@ -120,6 +120,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/clbanning/mxj/v2 v2.5.5 h1:oT81vUeEiQQ/DcHbzSytRngP6Ky9O+L+0Bw0zSJag9E=
|
||||
github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
@@ -598,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0 h1:9PQamOq285DyVsRlS4KB/x2+xkr5QlpiT9Y/BPutS4A=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220302182733-74dc40dc2ef0/go.mod h1:R9bG4y/iq89jNC0xZ25uKDqenyKFTR3X9acGDOkKWSE=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE=
|
||||
github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI=
|
||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
|
||||
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
|
||||
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
|
||||
|
||||
47
cli/utils/httpUtils.go
Normal file
47
cli/utils/httpUtils.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Get - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Get(url string, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Get(url))
|
||||
}
|
||||
|
||||
// Post - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Post(url, contentType string, body io.Reader, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Post(url, contentType, body))
|
||||
}
|
||||
|
||||
// Do - When err is nil, resp always contains a non-nil resp.Body.
|
||||
// Caller should close resp.Body when done reading from it.
|
||||
func Do(req *http.Request, client *http.Client) (*http.Response, error) {
|
||||
return checkError(client.Do(req))
|
||||
}
|
||||
|
||||
func checkError(response *http.Response, errInOperation error) (*http.Response, error) {
|
||||
if errInOperation != nil {
|
||||
return response, errInOperation
|
||||
// Check only if status != 200 (and not status >= 300). Agent APIs return only 200 on success.
|
||||
} else if response.StatusCode != http.StatusOK {
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
response.Body.Close()
|
||||
response.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
|
||||
if err != nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
errorMsg := strings.ReplaceAll(string(body), "\n", ";")
|
||||
return response, fmt.Errorf("got response with status code: %d, body: %s", response.StatusCode, errorMsg)
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
45
devops/check_modified_files.sh
Executable file
45
devops/check_modified_files.sh
Executable file
@@ -0,0 +1,45 @@
|
||||
#!/bin/bash
|
||||
paths_arr=( "$@" )
|
||||
|
||||
printf "\n========== List modified files ==========\n"
|
||||
echo "$(git diff --name-only HEAD^ HEAD)"
|
||||
|
||||
printf "\n========== List paths to match and check existence ==========\n"
|
||||
for path in ${paths_arr[*]}
|
||||
do
|
||||
if [ -f "$path" ] || [ -d "$path" ]; then
|
||||
echo "$path - found"
|
||||
else
|
||||
echo "$path - does not found - exiting with failure"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
printf "\n========== Check paths of modified files ==========\n"
|
||||
git diff --name-only HEAD^ HEAD > files.txt
|
||||
matched=false
|
||||
while IFS= read -r file
|
||||
do
|
||||
for path in ${paths_arr[*]}
|
||||
do
|
||||
if [[ $file == $path* ]]; then
|
||||
echo "$file - match path: $path"
|
||||
matched=true
|
||||
break
|
||||
fi
|
||||
done
|
||||
if [[ $matched == true ]]; then
|
||||
break
|
||||
else
|
||||
echo "$file - does not match any given path"
|
||||
fi
|
||||
done < files.txt
|
||||
|
||||
printf "\n========== Result ==========\n"
|
||||
if [[ $matched = true ]]; then
|
||||
echo "match found"
|
||||
echo "::set-output name=matched::true"
|
||||
else
|
||||
echo "no match found"
|
||||
echo "::set-output name=matched::false"
|
||||
fi
|
||||
@@ -325,15 +325,22 @@ func (tapperSyncer *MizuTapperSyncer) updateMizuTappers() error {
|
||||
tapperSyncer.config.MizuApiFilteringOptions,
|
||||
tapperSyncer.config.LogLevel,
|
||||
tapperSyncer.config.ServiceMesh,
|
||||
tapperSyncer.config.Tls,
|
||||
); err != nil {
|
||||
tapperSyncer.config.Tls); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Successfully created %v tappers", len(tapperSyncer.nodeToTappedPodMap))
|
||||
} else {
|
||||
if err := tapperSyncer.kubernetesProvider.RemoveDaemonSet(tapperSyncer.context, tapperSyncer.config.MizuResourcesNamespace, TapperDaemonSetName); err != nil {
|
||||
if err := tapperSyncer.kubernetesProvider.ResetMizuTapperDaemonSet(
|
||||
tapperSyncer.context,
|
||||
tapperSyncer.config.MizuResourcesNamespace,
|
||||
TapperDaemonSetName,
|
||||
tapperSyncer.config.AgentImage,
|
||||
TapperPodName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Log.Debugf("Successfully reset tapper daemon set")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -449,9 +449,9 @@ func (provider *Provider) CanI(ctx context.Context, namespace string, resource s
|
||||
Spec: auth.SelfSubjectAccessReviewSpec{
|
||||
ResourceAttributes: &auth.ResourceAttributes{
|
||||
Namespace: namespace,
|
||||
Resource: resource,
|
||||
Verb: verb,
|
||||
Group: group,
|
||||
Resource: resource,
|
||||
Verb: verb,
|
||||
Group: group,
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -995,6 +995,55 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) ResetMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string) error {
|
||||
agentContainer := applyconfcore.Container()
|
||||
agentContainer.WithName(tapperPodName)
|
||||
agentContainer.WithImage(podImage)
|
||||
|
||||
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
|
||||
nodeSelectorRequirement.WithKey("mizu-non-existing-label")
|
||||
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpExists)
|
||||
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
|
||||
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
|
||||
nodeSelector := applyconfcore.NodeSelector()
|
||||
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
|
||||
nodeAffinity := applyconfcore.NodeAffinity()
|
||||
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
|
||||
affinity := applyconfcore.Affinity()
|
||||
affinity.WithNodeAffinity(nodeAffinity)
|
||||
|
||||
podSpec := applyconfcore.PodSpec()
|
||||
podSpec.WithContainers(agentContainer)
|
||||
podSpec.WithAffinity(affinity)
|
||||
|
||||
podTemplate := applyconfcore.PodTemplateSpec()
|
||||
podTemplate.WithLabels(map[string]string{
|
||||
"app": tapperPodName,
|
||||
LabelManagedBy: provider.managedBy,
|
||||
LabelCreatedBy: provider.createdBy,
|
||||
})
|
||||
podTemplate.WithSpec(podSpec)
|
||||
|
||||
labelSelector := applyconfmeta.LabelSelector()
|
||||
labelSelector.WithMatchLabels(map[string]string{"app": tapperPodName})
|
||||
|
||||
applyOptions := metav1.ApplyOptions{
|
||||
Force: true,
|
||||
FieldManager: fieldManagerName,
|
||||
}
|
||||
|
||||
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
|
||||
daemonSet.
|
||||
WithLabels(map[string]string{
|
||||
LabelManagedBy: provider.managedBy,
|
||||
LabelCreatedBy: provider.createdBy,
|
||||
}).
|
||||
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
|
||||
|
||||
_, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) listPodsImpl(ctx context.Context, regex *regexp.Regexp, namespaces []string, listOptions metav1.ListOptions) ([]core.Pod, error) {
|
||||
var pods []core.Pod
|
||||
for _, namespace := range namespaces {
|
||||
@@ -1038,7 +1087,7 @@ func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, r
|
||||
return matchingPods, nil
|
||||
}
|
||||
|
||||
func(provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
|
||||
func (provider *Provider) ListPodsByAppLabel(ctx context.Context, namespaces string, labelName string) ([]core.Pod, error) {
|
||||
pods, err := provider.clientSet.CoreV1().Pods(namespaces).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", labelName)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
@@ -18,6 +19,9 @@ import (
|
||||
|
||||
const mizuTestEnvVar = "MIZU_TEST"
|
||||
|
||||
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
|
||||
var UnknownPort uint16 = 0
|
||||
|
||||
type Protocol struct {
|
||||
Name string `json:"name"`
|
||||
LongName string `json:"longName"`
|
||||
@@ -100,6 +104,7 @@ type Dissector interface {
|
||||
Ping()
|
||||
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
|
||||
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
|
||||
Summarize(entry *Entry) *BaseEntry
|
||||
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
|
||||
Macros() map[string]string
|
||||
NewResponseRequestMatcher() RequestResponseMatcher
|
||||
@@ -135,12 +140,7 @@ type Entry struct {
|
||||
StartTime time.Time `json:"startTime"`
|
||||
Request map[string]interface{} `json:"request"`
|
||||
Response map[string]interface{} `json:"response"`
|
||||
Summary string `json:"summary"`
|
||||
Method string `json:"method"`
|
||||
Status int `json:"status"`
|
||||
ElapsedTime int64 `json:"elapsedTime"`
|
||||
Path string `json:"path"`
|
||||
IsOutgoing bool `json:"isOutgoing,omitempty"`
|
||||
Rules ApplicableRules `json:"rules,omitempty"`
|
||||
ContractStatus ContractStatus `json:"contractStatus,omitempty"`
|
||||
ContractRequestReason string `json:"contractRequestReason,omitempty"`
|
||||
@@ -154,6 +154,7 @@ type EntryWrapper struct {
|
||||
Representation string `json:"representation"`
|
||||
BodySize int64 `json:"bodySize"`
|
||||
Data *Entry `json:"data"`
|
||||
Base *BaseEntry `json:"base"`
|
||||
Rules []map[string]interface{} `json:"rulesMatched,omitempty"`
|
||||
IsRulesEnabled bool `json:"isRulesEnabled"`
|
||||
}
|
||||
@@ -161,11 +162,12 @@ type EntryWrapper struct {
|
||||
type BaseEntry struct {
|
||||
Id uint `json:"id"`
|
||||
Protocol Protocol `json:"proto,omitempty"`
|
||||
Url string `json:"url,omitempty"`
|
||||
Path string `json:"path,omitempty"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
StatusCode int `json:"status"`
|
||||
SummaryQuery string `json:"summaryQuery,omitempty"`
|
||||
Status int `json:"status"`
|
||||
StatusQuery string `json:"statusQuery"`
|
||||
Method string `json:"method,omitempty"`
|
||||
MethodQuery string `json:"methodQuery,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
Source *TCP `json:"src"`
|
||||
Destination *TCP `json:"dst"`
|
||||
@@ -190,44 +192,6 @@ type Contract struct {
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
func Summarize(entry *Entry) *BaseEntry {
|
||||
return &BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Path: entry.Path,
|
||||
Summary: entry.Summary,
|
||||
StatusCode: entry.Status,
|
||||
Method: entry.Method,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.IsOutgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
type DataUnmarshaler interface {
|
||||
UnmarshalData(*Entry) error
|
||||
}
|
||||
|
||||
func (bed *BaseEntry) UnmarshalData(entry *Entry) error {
|
||||
bed.Protocol = entry.Protocol
|
||||
bed.Id = entry.Id
|
||||
bed.Path = entry.Path
|
||||
bed.Summary = entry.Summary
|
||||
bed.StatusCode = entry.Status
|
||||
bed.Method = entry.Method
|
||||
bed.Timestamp = entry.Timestamp
|
||||
bed.Source = entry.Source
|
||||
bed.Destination = entry.Destination
|
||||
bed.IsOutgoing = entry.IsOutgoing
|
||||
bed.Latency = entry.ElapsedTime
|
||||
bed.ContractStatus = entry.ContractStatus
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
TABLE string = "table"
|
||||
BODY string = "body"
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/amqp/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/amqp/\* expect
|
||||
|
||||
@@ -219,31 +219,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
|
||||
summary := ""
|
||||
switch request["method"] {
|
||||
case basicMethodMap[40]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case basicMethodMap[60]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case exchangeMethodMap[10]:
|
||||
summary = reqDetails["exchange"].(string)
|
||||
case queueMethodMap[10]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
case connectionMethodMap[10]:
|
||||
summary = fmt.Sprintf(
|
||||
"%s.%s",
|
||||
strconv.Itoa(int(reqDetails["versionMajor"].(float64))),
|
||||
strconv.Itoa(int(reqDetails["versionMinor"].(float64))),
|
||||
)
|
||||
case connectionMethodMap[50]:
|
||||
summary = reqDetails["replyText"].(string)
|
||||
case queueMethodMap[20]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
case basicMethodMap[20]:
|
||||
summary = reqDetails["queue"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
reqDetails["method"] = request["method"]
|
||||
return &api.Entry{
|
||||
Protocol: protocol,
|
||||
@@ -260,17 +235,70 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Namespace: namespace,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Method: request["method"].(string),
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: 0,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
method := entry.Request["method"].(string)
|
||||
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
||||
switch method {
|
||||
case basicMethodMap[40]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case basicMethodMap[60]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case exchangeMethodMap[10]:
|
||||
summary = entry.Request["exchange"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.exchange == "%s"`, summary)
|
||||
case queueMethodMap[10]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
case connectionMethodMap[10]:
|
||||
versionMajor := int(entry.Request["versionMajor"].(float64))
|
||||
versionMinor := int(entry.Request["versionMinor"].(float64))
|
||||
summary = fmt.Sprintf(
|
||||
"%s.%s",
|
||||
strconv.Itoa(versionMajor),
|
||||
strconv.Itoa(versionMinor),
|
||||
)
|
||||
summaryQuery = fmt.Sprintf(`request.versionMajor == %d and request.versionMinor == %d`, versionMajor, versionMinor)
|
||||
case connectionMethodMap[50]:
|
||||
summary = entry.Request["replyText"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.replyText == "%s"`, summary)
|
||||
case queueMethodMap[20]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
case basicMethodMap[20]:
|
||||
summary = entry.Request["queue"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.queue == "%s"`, summary)
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: 0,
|
||||
StatusQuery: "",
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{})
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -186,7 +188,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -230,6 +232,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -243,7 +302,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect2/http/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/http/\* expect
|
||||
|
||||
@@ -231,7 +231,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails["targetUri"] = reqDetails["url"]
|
||||
reqDetails["path"] = path
|
||||
reqDetails["pathSegments"] = strings.Split(path, "/")[1:]
|
||||
reqDetails["summary"] = path
|
||||
|
||||
// Rearrange the maps for the querying
|
||||
reqDetails["_headers"] = reqDetails["headers"]
|
||||
@@ -248,17 +247,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails["_queryStringMerged"] = mapSliceMergeRepeatedKeys(reqDetails["_queryString"].([]interface{}))
|
||||
reqDetails["queryString"] = mapSliceRebuildAsMap(reqDetails["_queryStringMerged"].([]interface{}))
|
||||
|
||||
method := reqDetails["method"].(string)
|
||||
statusCode := int(resDetails["status"].(float64))
|
||||
if item.Protocol.Abbreviation == "gRPC" {
|
||||
resDetails["statusText"] = grpcStatusCodes[statusCode]
|
||||
}
|
||||
|
||||
if item.Protocol.Version == "2.0" && !isRequestUpgradedH2C {
|
||||
reqDetails["url"] = path
|
||||
request["url"] = path
|
||||
}
|
||||
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -280,17 +273,40 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
Method: method,
|
||||
Status: statusCode,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: path,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
HTTPPair: string(httpPair),
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
summary := entry.Request["path"].(string)
|
||||
summaryQuery := fmt.Sprintf(`request.path == "%s"`, summary)
|
||||
method := entry.Request["method"].(string)
|
||||
methodQuery := fmt.Sprintf(`request.method == "%s"`, method)
|
||||
status := int(entry.Response["status"].(float64))
|
||||
statusQuery := fmt.Sprintf(`response.status == %d`, status)
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
|
||||
details, _ := json.Marshal([]api.TableData{
|
||||
{
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -188,7 +190,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -232,6 +234,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -245,7 +304,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/kafka/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/kafka/\* expect
|
||||
|
||||
@@ -61,83 +61,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
|
||||
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
|
||||
request := item.Pair.Request.Payload.(map[string]interface{})
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
apiKey := ApiKey(reqDetails["apiKey"].(float64))
|
||||
|
||||
summary := ""
|
||||
switch apiKey {
|
||||
case Metadata:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case ApiVersions:
|
||||
summary = reqDetails["clientID"].(string)
|
||||
case Produce:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topicData"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case Fetch:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case ListOffsets:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case CreateTopics:
|
||||
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for _, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if reqDetails["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := reqDetails["topicNames"].([]string)
|
||||
for _, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
}
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -158,13 +82,127 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),
|
||||
Method: apiNames[apiKey],
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
status := 0
|
||||
statusQuery := ""
|
||||
|
||||
apiKey := ApiKey(entry.Request["apiKey"].(float64))
|
||||
method := apiNames[apiKey]
|
||||
methodQuery := fmt.Sprintf("request.apiKey == %d", int(entry.Request["apiKey"].(float64)))
|
||||
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
switch apiKey {
|
||||
case Metadata:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case ApiVersions:
|
||||
summary = entry.Request["clientID"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.clientID == "%s"`, summary)
|
||||
case Produce:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topicData"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topicData[%d].topic == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case Fetch:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["topic"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].topic == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case ListOffsets:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case CreateTopics:
|
||||
_topics := entry.Request["payload"].(map[string]interface{})["topics"]
|
||||
if _topics == nil {
|
||||
break
|
||||
}
|
||||
topics := _topics.([]interface{})
|
||||
for i, topic := range topics {
|
||||
summary += fmt.Sprintf("%s, ", topic.(map[string]interface{})["name"].(string))
|
||||
summaryQuery += fmt.Sprintf(`request.payload.topics[%d].name == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
case DeleteTopics:
|
||||
if entry.Request["topicNames"] == nil {
|
||||
break
|
||||
}
|
||||
topicNames := entry.Request["topicNames"].([]string)
|
||||
for i, name := range topicNames {
|
||||
summary += fmt.Sprintf("%s, ", name)
|
||||
summaryQuery += fmt.Sprintf(`request.topicNames[%d] == "%s" and`, i, summary)
|
||||
}
|
||||
if len(summary) > 0 {
|
||||
summary = summary[:len(summary)-2]
|
||||
summaryQuery = summaryQuery[:len(summaryQuery)-4]
|
||||
}
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,14 +21,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -13,4 +13,4 @@ test-pull-bin:
|
||||
|
||||
test-pull-expect:
|
||||
@mkdir -p expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect/redis/\* expect
|
||||
@[ "${skipexpect}" ] && echo "Skipping downloading expected JSONs" || gsutil -o 'GSUtil:parallel_process_count=5' -o 'GSUtil:parallel_thread_count=5' -m cp -r gs://static.up9.io/mizu/test-pcap/expect3/redis/\* expect
|
||||
|
||||
@@ -65,17 +65,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
reqDetails := request["details"].(map[string]interface{})
|
||||
resDetails := response["details"].(map[string]interface{})
|
||||
|
||||
method := ""
|
||||
if reqDetails["command"] != nil {
|
||||
method = reqDetails["command"].(string)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
if reqDetails["key"] != nil {
|
||||
summary = reqDetails["key"].(string)
|
||||
}
|
||||
|
||||
request["url"] = summary
|
||||
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
|
||||
if elapsedTime < 0 {
|
||||
elapsedTime = 0
|
||||
@@ -96,17 +85,50 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
|
||||
Outgoing: item.ConnectionInfo.IsOutgoing,
|
||||
Request: reqDetails,
|
||||
Response: resDetails,
|
||||
Method: method,
|
||||
Status: 0,
|
||||
Timestamp: item.Timestamp,
|
||||
StartTime: item.Pair.Request.CaptureTime,
|
||||
ElapsedTime: elapsedTime,
|
||||
Summary: summary,
|
||||
IsOutgoing: item.ConnectionInfo.IsOutgoing,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d dissecting) Summarize(entry *api.Entry) *api.BaseEntry {
|
||||
status := 0
|
||||
statusQuery := ""
|
||||
|
||||
method := ""
|
||||
methodQuery := ""
|
||||
if entry.Request["command"] != nil {
|
||||
method = entry.Request["command"].(string)
|
||||
methodQuery = fmt.Sprintf(`request.command == "%s"`, method)
|
||||
}
|
||||
|
||||
summary := ""
|
||||
summaryQuery := ""
|
||||
if entry.Request["key"] != nil {
|
||||
summary = entry.Request["key"].(string)
|
||||
summaryQuery = fmt.Sprintf(`request.key == "%s"`, summary)
|
||||
}
|
||||
|
||||
return &api.BaseEntry{
|
||||
Id: entry.Id,
|
||||
Protocol: entry.Protocol,
|
||||
Summary: summary,
|
||||
SummaryQuery: summaryQuery,
|
||||
Status: status,
|
||||
StatusQuery: statusQuery,
|
||||
Method: method,
|
||||
MethodQuery: methodQuery,
|
||||
Timestamp: entry.Timestamp,
|
||||
Source: entry.Source,
|
||||
Destination: entry.Destination,
|
||||
IsOutgoing: entry.Outgoing,
|
||||
Latency: entry.ElapsedTime,
|
||||
Rules: entry.Rules,
|
||||
ContractStatus: entry.ContractStatus,
|
||||
}
|
||||
}
|
||||
|
||||
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
|
||||
bodySize = 0
|
||||
representation := make(map[string]interface{})
|
||||
|
||||
@@ -22,14 +22,16 @@ import (
|
||||
const (
|
||||
binDir = "bin"
|
||||
patternBin = "*_req.bin"
|
||||
patternDissect = "*.json"
|
||||
patternExpect = "*.json"
|
||||
msgDissecting = "Dissecting:"
|
||||
msgAnalyzing = "Analyzing:"
|
||||
msgSummarizing = "Summarizing:"
|
||||
msgRepresenting = "Representing:"
|
||||
respSuffix = "_res.bin"
|
||||
expectDir = "expect"
|
||||
dissectDir = "dissect"
|
||||
analyzeDir = "analyze"
|
||||
summarizeDir = "summarize"
|
||||
representDir = "represent"
|
||||
testUpdate = "TEST_UPDATE"
|
||||
)
|
||||
@@ -187,7 +189,7 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirDissect, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -231,6 +233,63 @@ func TestAnalyze(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSummarize(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
expectDirAnalyze := path.Join(expectDir, analyzeDir)
|
||||
expectDirSummarize := path.Join(expectDir, summarizeDir)
|
||||
|
||||
if testUpdateEnabled {
|
||||
os.RemoveAll(expectDirSummarize)
|
||||
err := os.MkdirAll(expectDirSummarize, 0775)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for _, _path := range paths {
|
||||
fmt.Printf("%s %s\n", msgSummarizing, _path)
|
||||
|
||||
bytes, err := ioutil.ReadFile(_path)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var entries []*api.Entry
|
||||
err = json.Unmarshal(bytes, &entries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
var baseEntries []*api.BaseEntry
|
||||
for _, entry := range entries {
|
||||
baseEntry := dissector.Summarize(entry)
|
||||
baseEntries = append(baseEntries, baseEntry)
|
||||
}
|
||||
|
||||
pathExpect := path.Join(expectDirSummarize, filepath.Base(_path))
|
||||
|
||||
marshaled, err := json.Marshal(baseEntries)
|
||||
assert.Nil(t, err)
|
||||
|
||||
if testUpdateEnabled {
|
||||
if len(baseEntries) > 0 {
|
||||
err = os.WriteFile(pathExpect, marshaled, 0644)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(pathExpect); errors.Is(err, os.ErrNotExist) {
|
||||
assert.Len(t, entries, 0)
|
||||
} else {
|
||||
expectedBytes, err := ioutil.ReadFile(pathExpect)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.JSONEq(t, string(expectedBytes), string(marshaled))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRepresent(t *testing.T) {
|
||||
_, testUpdateEnabled := os.LookupEnv(testUpdate)
|
||||
|
||||
@@ -244,7 +303,7 @@ func TestRepresent(t *testing.T) {
|
||||
}
|
||||
|
||||
dissector := NewDissector()
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternDissect))
|
||||
paths, err := filepath.Glob(path.Join(expectDirAnalyze, patternExpect))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -52,7 +52,6 @@ var snaplen = flag.Int("s", 65536, "Snap length (number of bytes max to read per
|
||||
var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
|
||||
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
|
||||
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
|
||||
var pids = flag.String("pids", "", "A comma separated list of PIDs to capture their network namespaces")
|
||||
var servicemesh = flag.Bool("servicemesh", false, "Record decrypted traffic if the cluster is configured with a service mesh and with mtls")
|
||||
var tls = flag.Bool("tls", false, "Enable TLS tapper")
|
||||
|
||||
@@ -190,7 +189,7 @@ func initializePacketSources() error {
|
||||
}
|
||||
|
||||
var err error
|
||||
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
|
||||
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
|
||||
return err
|
||||
} else {
|
||||
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
|
||||
@@ -248,7 +247,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
|
||||
tls := tlstapper.TlsTapper{}
|
||||
tlsPerfBufferSize := os.Getpagesize() * 100
|
||||
|
||||
if err := tls.Init(tlsPerfBufferSize); err != nil {
|
||||
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
|
||||
tlstapper.LogError(err)
|
||||
return
|
||||
}
|
||||
@@ -272,6 +271,5 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
|
||||
OutputChannel: outputItems,
|
||||
}
|
||||
|
||||
poller := tlstapper.NewTlsPoller(&tls, extension)
|
||||
go poller.Poll(extension, emitter, options)
|
||||
go tls.Poll(emitter, options)
|
||||
}
|
||||
|
||||
83
tap/source/netns_packet_source.go
Normal file
83
tap/source/netns_packet_source.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/vishvananda/netns"
|
||||
)
|
||||
|
||||
func newNetnsPacketSource(procfs string, pid string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
nsh, err := netns.GetFromPath(fmt.Sprintf("%s/%s/ns/net", procfs, pid))
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of pid %s - %w", pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
src, err := newPacketSourceFromNetnsHandle(pid, nsh, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error starting netns packet source for %s - %w", pid, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return src, nil
|
||||
}
|
||||
|
||||
func newPacketSourceFromNetnsHandle(pid string, nsh netns.NsHandle, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
|
||||
done := make(chan *tcpPacketSource)
|
||||
errors := make(chan error)
|
||||
|
||||
go func(done chan<- *tcpPacketSource) {
|
||||
// Setting a netns should be done from a dedicated OS thread.
|
||||
//
|
||||
// goroutines are not really OS threads, we try to mimic the issue by
|
||||
// locking the OS thread to this goroutine
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %w", err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err := netns.Set(nsh); err != nil {
|
||||
logger.Log.Errorf("Unable to set netns of pid %s - %w", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("netns-%s-%s", pid, interfaceName)
|
||||
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error listening to PID %s - %w", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err := netns.Set(oldnetns); err != nil {
|
||||
logger.Log.Errorf("Unable to set back netns of current thread %w", err)
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
done <- src
|
||||
}(done)
|
||||
|
||||
select {
|
||||
case err := <-errors:
|
||||
return nil, err
|
||||
case source := <-done:
|
||||
return source, nil
|
||||
}
|
||||
}
|
||||
@@ -2,109 +2,46 @@ package source
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/vishvananda/netns"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const bpfFilterMaxPods = 150
|
||||
const hostSourcePid = "0"
|
||||
|
||||
type PacketSourceManager struct {
|
||||
sources []*tcpPacketSource
|
||||
sources map[string]*tcpPacketSource
|
||||
}
|
||||
|
||||
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
|
||||
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
||||
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
sources := make([]*tcpPacketSource, 0)
|
||||
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
|
||||
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
|
||||
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
|
||||
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
|
||||
|
||||
return &PacketSourceManager{
|
||||
sources: sources,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func createHostSource(sources []*tcpPacketSource, filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) ([]*tcpPacketSource, error) {
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return sources, err
|
||||
sourceManager := &PacketSourceManager{
|
||||
sources: map[string]*tcpPacketSource{
|
||||
hostSourcePid: hostSource,
|
||||
},
|
||||
}
|
||||
|
||||
return append(sources, hostSource), nil
|
||||
}
|
||||
|
||||
func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids string,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if pids == "" {
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, strings.Split(pids, ","), interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !mtls {
|
||||
return sources
|
||||
}
|
||||
|
||||
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, envoyPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
}
|
||||
|
||||
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
if !mtls {
|
||||
return sources
|
||||
}
|
||||
|
||||
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
|
||||
return sources
|
||||
}
|
||||
|
||||
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
|
||||
sources = append(sources, netnsSources...)
|
||||
|
||||
return sources
|
||||
sourceManager.UpdatePods(mtls, procfs, pods, interfaceName, behaviour)
|
||||
return sourceManager, nil
|
||||
}
|
||||
|
||||
func newHostPacketSource(filename string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
var name string
|
||||
|
||||
if filename == "" {
|
||||
name = fmt.Sprintf("host-%v", interfaceName)
|
||||
name = fmt.Sprintf("host-%s", interfaceName)
|
||||
} else {
|
||||
name = fmt.Sprintf("file-%v", filename)
|
||||
name = fmt.Sprintf("file-%s", filename)
|
||||
}
|
||||
|
||||
source, err := newTcpPacketSource(name, filename, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -112,90 +49,93 @@ func newHostPacketSource(filename string, interfaceName string,
|
||||
return source, nil
|
||||
}
|
||||
|
||||
func newNetnsPacketSources(procfs string, pids []string, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
|
||||
result := make([]*tcpPacketSource, 0)
|
||||
|
||||
for _, pidstr := range pids {
|
||||
pid, err := strconv.Atoi(pidstr)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Invalid PID: %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
nsh, err := netns.GetFromPath(fmt.Sprintf("%v/%v/ns/net", procfs, pid))
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of pid %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
src, err := newNetnsPacketSource(pid, nsh, interfaceName, behaviour)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error starting netns packet source for %v - %v", pid, err)
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, src)
|
||||
func (m *PacketSourceManager) UpdatePods(mtls bool, procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) {
|
||||
if mtls {
|
||||
m.updateMtlsPods(procfs, pods, interfaceName, behaviour)
|
||||
}
|
||||
|
||||
return result
|
||||
m.setBPFFilter(pods)
|
||||
}
|
||||
|
||||
func newNetnsPacketSource(pid int, nsh netns.NsHandle, interfaceName string,
|
||||
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
|
||||
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) {
|
||||
|
||||
done := make(chan *tcpPacketSource)
|
||||
errors := make(chan error)
|
||||
relevantPids := m.getRelevantPids(procfs, pods)
|
||||
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
||||
|
||||
go func(done chan<- *tcpPacketSource) {
|
||||
// Setting a netns should be done from a dedicated OS thread.
|
||||
//
|
||||
// goroutines are not really OS threads, we try to mimic the issue by
|
||||
// locking the OS thread to this goroutine
|
||||
//
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
oldnetns, err := netns.Get()
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Unable to get netns of current thread %v", err)
|
||||
errors <- err
|
||||
return
|
||||
for pid, src := range m.sources {
|
||||
if _, ok := relevantPids[pid]; !ok {
|
||||
src.close()
|
||||
delete(m.sources, pid)
|
||||
}
|
||||
}
|
||||
|
||||
if err := netns.Set(nsh); err != nil {
|
||||
logger.Log.Errorf("Unable to set netns of pid %v - %v", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
for pid := range relevantPids {
|
||||
if _, ok := m.sources[pid]; !ok {
|
||||
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
|
||||
|
||||
if err == nil {
|
||||
m.sources[pid] = source
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("netns-%v-%v", pid, interfaceName)
|
||||
src, err := newTcpPacketSource(name, "", interfaceName, behaviour)
|
||||
func (m *PacketSourceManager) getRelevantPids(procfs string, pods []v1.Pod) map[string]bool {
|
||||
relevantPids := make(map[string]bool)
|
||||
relevantPids[hostSourcePid] = true
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Errorf("Error listening to PID %v - %v", pid, err)
|
||||
errors <- err
|
||||
return
|
||||
if envoyPids, err := discoverRelevantEnvoyPids(procfs, pods); err != nil {
|
||||
logger.Log.Warningf("Unable to discover envoy pids - %w", err)
|
||||
} else {
|
||||
for _, pid := range envoyPids {
|
||||
relevantPids[pid] = true
|
||||
}
|
||||
}
|
||||
|
||||
if err := netns.Set(oldnetns); err != nil {
|
||||
logger.Log.Errorf("Unable to set back netns of current thread %v", err)
|
||||
errors <- err
|
||||
return
|
||||
if linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods); err != nil {
|
||||
logger.Log.Warningf("Unable to discover linkerd pids - %w", err)
|
||||
} else {
|
||||
for _, pid := range linkerdPids {
|
||||
relevantPids[pid] = true
|
||||
}
|
||||
}
|
||||
|
||||
done <- src
|
||||
}(done)
|
||||
return relevantPids
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errors:
|
||||
return nil, err
|
||||
case source := <-done:
|
||||
return source, nil
|
||||
func buildBPFExpr(pods []v1.Pod) string {
|
||||
hostsFilter := make([]string, 0)
|
||||
|
||||
for _, pod := range pods {
|
||||
hostsFilter = append(hostsFilter, fmt.Sprintf("host %s", pod.Status.PodIP))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s and port not 443", strings.Join(hostsFilter, " or "))
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
|
||||
if len(pods) == 0 {
|
||||
logger.Log.Info("No pods provided, skipping pcap bpf filter")
|
||||
return
|
||||
}
|
||||
|
||||
var expr string
|
||||
|
||||
if len(pods) > bpfFilterMaxPods {
|
||||
logger.Log.Info("Too many pods for setting ebpf filter %d, setting just not 443", len(pods))
|
||||
expr = "port not 443"
|
||||
} else {
|
||||
expr = buildBPFExpr(pods)
|
||||
}
|
||||
|
||||
logger.Log.Infof("Setting pcap bpf filter %s", expr)
|
||||
|
||||
for pid, src := range m.sources {
|
||||
if err := src.setBPFFilter(expr); err != nil {
|
||||
logger.Log.Warningf("Error setting bpf filter for %s %v - %w", pid, src, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -98,6 +98,14 @@ func newTcpPacketSource(name, filename string, interfaceName string,
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) String() string {
|
||||
return source.name
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) setBPFFilter(expr string) (err error) {
|
||||
return source.handle.SetBPFFilter(expr)
|
||||
}
|
||||
|
||||
func (source *tcpPacketSource) close() {
|
||||
if source.handle != nil {
|
||||
source.handle.Close()
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM alpine:3
|
||||
FROM alpine:3.14
|
||||
|
||||
RUN apk --no-cache update && apk --no-cache add clang llvm libbpf-dev go linux-headers
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ Copyright (C) UP9 Inc.
|
||||
#define FLAGS_IS_CLIENT_BIT (1 << 0)
|
||||
#define FLAGS_IS_READ_BIT (1 << 1)
|
||||
|
||||
// The same struct can be found in Chunk.go
|
||||
// The same struct can be found in chunk.go
|
||||
//
|
||||
// Be careful when editing, alignment and padding should be exactly the same in go/c.
|
||||
//
|
||||
|
||||
@@ -8,20 +8,20 @@ import (
|
||||
"github.com/go-errors/errors"
|
||||
)
|
||||
|
||||
const FLAGS_IS_CLIENT_BIT int32 = (1 << 0)
|
||||
const FLAGS_IS_READ_BIT int32 = (1 << 1)
|
||||
const FLAGS_IS_CLIENT_BIT uint32 = (1 << 0)
|
||||
const FLAGS_IS_READ_BIT uint32 = (1 << 1)
|
||||
|
||||
// The same struct can be found in maps.h
|
||||
//
|
||||
// Be careful when editing, alignment and padding should be exactly the same in go/c.
|
||||
//
|
||||
type tlsChunk struct {
|
||||
Pid int32
|
||||
Tgid int32
|
||||
Len int32
|
||||
Recorded int32
|
||||
Fd int32
|
||||
Flags int32
|
||||
Pid uint32
|
||||
Tgid uint32
|
||||
Len uint32
|
||||
Recorded uint32
|
||||
Fd uint32
|
||||
Flags uint32
|
||||
Address [16]byte
|
||||
Data [4096]byte
|
||||
}
|
||||
@@ -68,3 +68,7 @@ func (c *tlsChunk) isWrite() bool {
|
||||
func (c *tlsChunk) getRecordedData() []byte {
|
||||
return c.Data[:c.Recorded]
|
||||
}
|
||||
|
||||
func (c *tlsChunk) isRequest() bool {
|
||||
return (c.isClient() && c.isWrite()) || (c.isServer() && c.isRead())
|
||||
}
|
||||
|
||||
102
tap/tlstapper/sockfd_info.go
Normal file
102
tap/tlstapper/sockfd_info.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
)
|
||||
|
||||
var socketInodeRegex = regexp.MustCompile(`socket:\[(\d+)\]`)
|
||||
|
||||
const (
|
||||
SRC_ADDRESS_FILED_INDEX = 1
|
||||
DST_ADDRESS_FILED_INDEX = 2
|
||||
INODE_FILED_INDEX = 9
|
||||
)
|
||||
|
||||
// This file helps to extract Ip and Port out of a Socket file descriptor.
|
||||
//
|
||||
// The equivalent bash commands are:
|
||||
//
|
||||
// > ls -l /proc/<pid>/fd/<fd>
|
||||
// Output something like "socket:[1234]" for sockets - 1234 is the inode of the socket
|
||||
// > cat /proc/<pid>/net/tcp | grep <inode>
|
||||
// Output a line per ipv4 socket, the 9th field is the inode of the socket
|
||||
// The 1st and 2nd fields are the source and dest ip and ports in a Hex format
|
||||
// 0100007F:50 is 127.0.0.1:80
|
||||
|
||||
func getAddressBySockfd(procfs string, pid uint32, fd uint32, src bool) (net.IP, uint16, error) {
|
||||
inode, err := getSocketInode(procfs, pid, fd)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
tcppath := fmt.Sprintf("%s/%d/net/tcp", procfs, pid)
|
||||
tcp, err := ioutil.ReadFile(tcppath)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
for _, line := range strings.Split(string(tcp), "\n") {
|
||||
parts := strings.Fields(line)
|
||||
|
||||
if len(parts) < 10 {
|
||||
continue
|
||||
}
|
||||
|
||||
if inode == parts[INODE_FILED_INDEX] {
|
||||
if src {
|
||||
return parseHexAddress(parts[SRC_ADDRESS_FILED_INDEX])
|
||||
} else {
|
||||
return parseHexAddress(parts[DST_ADDRESS_FILED_INDEX])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, 0, errors.Errorf("address not found [pid: %d] [sockfd: %d] [inode: %s]", pid, fd, inode)
|
||||
}
|
||||
|
||||
func getSocketInode(procfs string, pid uint32, fd uint32) (string, error) {
|
||||
fdlinkPath := fmt.Sprintf("%s/%d/fd/%d", procfs, pid, fd)
|
||||
fdlink, err := os.Readlink(fdlinkPath)
|
||||
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
tokens := socketInodeRegex.FindStringSubmatch(fdlink)
|
||||
|
||||
if tokens == nil || len(tokens) < 1 {
|
||||
return "", errors.Errorf("socket inode not found [pid: %d] [sockfd: %d] [link: %s]", pid, fd, fdlink)
|
||||
}
|
||||
|
||||
return tokens[1], nil
|
||||
}
|
||||
|
||||
// Format looks like 0100007F:50 for 127.0.0.1:80
|
||||
//
|
||||
func parseHexAddress(addr string) (net.IP, uint16, error) {
|
||||
addrParts := strings.Split(addr, ":")
|
||||
|
||||
port, err := strconv.ParseUint(addrParts[1], 16, 16)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
ip, err := strconv.ParseUint(addrParts[0], 16, 32)
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return net.IP{uint8(ip), uint8(ip >> 8), uint8(ip >> 16), uint8(ip >> 24)}, uint16(port), nil
|
||||
}
|
||||
@@ -2,7 +2,6 @@ package tlstapper
|
||||
|
||||
import (
|
||||
"debug/elf"
|
||||
"fmt"
|
||||
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
@@ -47,7 +46,7 @@ func findBaseAddress(sslElf *elf.File, sslLibraryPath string) (uint64, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return 0, errors.New(fmt.Sprintf("Program header not found in %v", sslLibraryPath))
|
||||
return 0, errors.Errorf("Program header not found in %v", sslLibraryPath)
|
||||
}
|
||||
|
||||
func findSslOffsets(sslElf *elf.File, base uint64) (sslOffsets, error) {
|
||||
|
||||
@@ -2,43 +2,64 @@ package tlstapper
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cilium/ebpf/perf"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
const UNKNOWN_PORT uint16 = 80
|
||||
const UNKNOWN_HOST string = "127.0.0.1"
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
}
|
||||
|
||||
func NewTlsPoller(tls *TlsTapper, extension *api.Extension) *tlsPoller {
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
return &tlsPoller{
|
||||
tls: tls,
|
||||
readers: make(map[string]*tlsReader),
|
||||
closedReaders: make(chan string, 100),
|
||||
reqResMatcher: extension.Dissector.NewResponseRequestMatcher(),
|
||||
extension: extension,
|
||||
chunksReader: nil,
|
||||
procfs: procfs,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
func (p *tlsPoller) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
|
||||
var err error
|
||||
|
||||
p.chunksReader, err = perf.NewReader(bpfObjects.ChunksBuffer, bufferSize)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *tlsPoller) close() error {
|
||||
return p.chunksReader.Close()
|
||||
}
|
||||
|
||||
func (p *tlsPoller) poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
chunks := make(chan *tlsChunk)
|
||||
|
||||
go p.tls.pollPerf(chunks)
|
||||
go p.pollChunksPerfBuffer(chunks)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -47,7 +68,7 @@ func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.handleTlsChunk(chunk, extension, emitter, options); err != nil {
|
||||
if err := p.handleTlsChunk(chunk, p.extension, emitter, options); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
case key := <-p.closedReaders:
|
||||
@@ -56,6 +77,41 @@ func (p *tlsPoller) Poll(extension *api.Extension,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) pollChunksPerfBuffer(chunks chan<- *tlsChunk) {
|
||||
logger.Log.Infof("Start polling for tls events")
|
||||
|
||||
for {
|
||||
record, err := p.chunksReader.Read()
|
||||
|
||||
if err != nil {
|
||||
close(chunks)
|
||||
|
||||
if errors.Is(err, perf.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if record.LostSamples != 0 {
|
||||
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
|
||||
continue
|
||||
}
|
||||
|
||||
buffer := bytes.NewReader(record.RawSample)
|
||||
|
||||
var chunk tlsChunk
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
|
||||
LogError(errors.Errorf("Error parsing chunk %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
chunks <- &chunk
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions) error {
|
||||
ip, port, err := chunk.getAddress()
|
||||
@@ -75,7 +131,7 @@ func (p *tlsPoller) handleTlsChunk(chunk *tlsChunk, extension *api.Extension,
|
||||
reader.chunks <- chunk
|
||||
|
||||
if os.Getenv("MIZU_VERBOSE_TLS_TAPPER") == "true" {
|
||||
logTls(chunk, ip, port)
|
||||
p.logTls(chunk, ip, port)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -92,10 +148,9 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
|
||||
},
|
||||
}
|
||||
|
||||
isRequest := (chunk.isClient() && chunk.isWrite()) || (chunk.isServer() && chunk.isRead())
|
||||
tcpid := buildTcpId(isRequest, ip, port)
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
|
||||
go dissect(extension, reader, isRequest, &tcpid, emitter, options, p.reqResMatcher)
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
|
||||
return reader
|
||||
}
|
||||
|
||||
@@ -120,27 +175,36 @@ func buildTlsKey(chunk *tlsChunk, ip net.IP, port uint16) string {
|
||||
return fmt.Sprintf("%v:%v-%v:%v", chunk.isClient(), chunk.isRead(), ip, port)
|
||||
}
|
||||
|
||||
func buildTcpId(isRequest bool, ip net.IP, port uint16) api.TcpID {
|
||||
if isRequest {
|
||||
func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpID {
|
||||
myIp, myPort, err := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, chunk.isClient())
|
||||
|
||||
if err != nil {
|
||||
// May happen if the socket already closed, very likely to happen for localhost
|
||||
//
|
||||
myIp = api.UnknownIp
|
||||
myPort = api.UnknownPort
|
||||
}
|
||||
|
||||
if chunk.isRequest() {
|
||||
return api.TcpID{
|
||||
SrcIP: UNKNOWN_HOST,
|
||||
SrcIP: myIp.String(),
|
||||
DstIP: ip.String(),
|
||||
SrcPort: strconv.Itoa(int(UNKNOWN_PORT)),
|
||||
DstPort: strconv.FormatInt(int64(port), 10),
|
||||
SrcPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
DstPort: strconv.FormatUint(uint64(port), 10),
|
||||
Ident: "",
|
||||
}
|
||||
} else {
|
||||
return api.TcpID{
|
||||
SrcIP: ip.String(),
|
||||
DstIP: UNKNOWN_HOST,
|
||||
SrcPort: strconv.FormatInt(int64(port), 10),
|
||||
DstPort: strconv.Itoa(int(UNKNOWN_PORT)),
|
||||
DstIP: myIp.String(),
|
||||
SrcPort: strconv.FormatUint(uint64(port), 10),
|
||||
DstPort: strconv.FormatUint(uint64(myPort), 10),
|
||||
Ident: "",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
var flagsStr string
|
||||
|
||||
if chunk.isClient() {
|
||||
@@ -155,8 +219,13 @@ func logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
flagsStr += "W"
|
||||
}
|
||||
|
||||
srcIp, srcPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, true)
|
||||
dstIp, dstPort, _ := getAddressBySockfd(p.procfs, chunk.Pid, chunk.Fd, false)
|
||||
|
||||
str := strings.ReplaceAll(strings.ReplaceAll(string(chunk.Data[0:chunk.Recorded]), "\n", " "), "\r", "")
|
||||
|
||||
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (recorded %v out of %v) - %v - %v",
|
||||
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port, chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
logger.Log.Infof("PID: %v (tid: %v) (fd: %v) (client: %v) (addr: %v:%v) (fdaddr %v:%v>%v:%v) (recorded %v out of %v) - %v - %v",
|
||||
chunk.Pid, chunk.Tgid, chunk.Fd, flagsStr, ip, port,
|
||||
srcIp, srcPort, dstIp, dstPort,
|
||||
chunk.Recorded, chunk.Len, str, hex.EncodeToString(chunk.Data[0:chunk.Recorded]))
|
||||
}
|
||||
|
||||
@@ -132,8 +132,22 @@ func extractCgroup(lines []string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// cgroup in the /proc/<pid>/cgroup may look something like
|
||||
//
|
||||
// /system.slice/docker-<ID>.scope
|
||||
// /system.slice/containerd-<ID>.scope
|
||||
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
|
||||
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
|
||||
//
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// the "Container ID:" field when running kubectl describe pod <POD>
|
||||
//
|
||||
func normalizeCgroup(cgrouppath string) string {
|
||||
basename := strings.TrimSpace(path.Base(cgrouppath))
|
||||
|
||||
if strings.Contains(basename, "-") {
|
||||
basename = basename[strings.Index(basename, "-") + 1:]
|
||||
}
|
||||
|
||||
if strings.Contains(basename, ".") {
|
||||
return strings.TrimSuffix(basename, filepath.Ext(basename))
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/cilium/ebpf/perf"
|
||||
"github.com/cilium/ebpf/rlimit"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
|
||||
@@ -16,10 +13,10 @@ type TlsTapper struct {
|
||||
bpfObjects tlsTapperObjects
|
||||
syscallHooks syscallHooks
|
||||
sslHooksStructs []sslHooks
|
||||
reader *perf.Reader
|
||||
poller *tlsPoller
|
||||
}
|
||||
|
||||
func (t *TlsTapper) Init(bufferSize int) error {
|
||||
func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
|
||||
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
|
||||
|
||||
if err := setupRLimit(); err != nil {
|
||||
@@ -27,55 +24,23 @@ func (t *TlsTapper) Init(bufferSize int) error {
|
||||
}
|
||||
|
||||
t.bpfObjects = tlsTapperObjects{}
|
||||
|
||||
if err := loadTlsTapperObjects(&t.bpfObjects, nil); err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
t.syscallHooks = syscallHooks{}
|
||||
|
||||
if err := t.syscallHooks.installSyscallHooks(&t.bpfObjects); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.sslHooksStructs = make([]sslHooks, 0)
|
||||
|
||||
return t.initChunksReader(bufferSize)
|
||||
t.poller = newTlsPoller(t, extension, procfs)
|
||||
return t.poller.init(&t.bpfObjects, bufferSize)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) pollPerf(chunks chan<- *tlsChunk) {
|
||||
logger.Log.Infof("Start polling for tls events")
|
||||
|
||||
for {
|
||||
record, err := t.reader.Read()
|
||||
|
||||
if err != nil {
|
||||
close(chunks)
|
||||
|
||||
if errors.Is(err, perf.ErrClosed) {
|
||||
return
|
||||
}
|
||||
|
||||
LogError(errors.Errorf("Error reading chunks from tls perf, aborting TLS! %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
if record.LostSamples != 0 {
|
||||
logger.Log.Infof("Buffer is full, dropped %d chunks", record.LostSamples)
|
||||
continue
|
||||
}
|
||||
|
||||
buffer := bytes.NewReader(record.RawSample)
|
||||
|
||||
var chunk tlsChunk
|
||||
|
||||
if err := binary.Read(buffer, binary.LittleEndian, &chunk); err != nil {
|
||||
LogError(errors.Errorf("Error parsing chunk %v", err))
|
||||
continue
|
||||
}
|
||||
|
||||
chunks <- &chunk
|
||||
}
|
||||
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
|
||||
t.poller.poll(emitter, options)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
|
||||
@@ -118,7 +83,7 @@ func (t *TlsTapper) Close() []error {
|
||||
errors = append(errors, sslHooks.close()...)
|
||||
}
|
||||
|
||||
if err := t.reader.Close(); err != nil {
|
||||
if err := t.poller.close(); err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
|
||||
@@ -135,18 +100,6 @@ func setupRLimit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) initChunksReader(bufferSize int) error {
|
||||
var err error
|
||||
|
||||
t.reader, err = perf.NewReader(t.bpfObjects.ChunksBuffer, bufferSize)
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
// Code generated by bpf2go; DO NOT EDIT.
|
||||
//go:build arm64be || armbe || mips || mips64 || mips64p32 || ppc64 || s390 || s390x || sparc || sparc64
|
||||
// +build arm64be armbe mips mips64 mips64p32 ppc64 s390 s390x sparc sparc64
|
||||
|
||||
package tlstapper
|
||||
|
||||
Binary file not shown.
@@ -1,5 +1,4 @@
|
||||
// Code generated by bpf2go; DO NOT EDIT.
|
||||
//go:build 386 || amd64 || amd64p32 || arm || arm64 || mips64le || mips64p32le || mipsle || ppc64le || riscv64
|
||||
// +build 386 amd64 amd64p32 arm arm64 mips64le mips64p32le mipsle ppc64le riscv64
|
||||
|
||||
package tlstapper
|
||||
|
||||
Binary file not shown.
@@ -119,7 +119,7 @@ export const EntryDetailed = () => {
|
||||
bodySize={entryData.bodySize}
|
||||
elapsedTime={entryData.data.elapsedTime}
|
||||
/>}
|
||||
{entryData && <EntrySummary entry={entryData.data}/>}
|
||||
{entryData && <EntrySummary entry={entryData.base}/>}
|
||||
<>
|
||||
{entryData && <EntryViewer
|
||||
representation={entryData.representation}
|
||||
|
||||
@@ -25,9 +25,12 @@ interface TCPInterface {
|
||||
interface Entry {
|
||||
proto: ProtocolInterface,
|
||||
method?: string,
|
||||
methodQuery?: string,
|
||||
summary: string,
|
||||
summaryQuery: string,
|
||||
id: number,
|
||||
status?: number;
|
||||
statusQuery?: string;
|
||||
timestamp: Date;
|
||||
src: TCPInterface,
|
||||
dst: TCPInterface,
|
||||
@@ -152,10 +155,10 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
|
||||
horizontal={false}
|
||||
/> : null}
|
||||
{isStatusCodeEnabled && <div>
|
||||
<StatusCode statusCode={entry.status}/>
|
||||
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
|
||||
</div>}
|
||||
<div className={styles.endpointServiceContainer} style={{paddingLeft: endpointServiceContainer}}>
|
||||
<Summary method={entry.method} summary={entry.summary}/>
|
||||
<Summary method={entry.method} methodQuery={entry.methodQuery} summary={entry.summary} summaryQuery={entry.summaryQuery}/>
|
||||
<div className={styles.resolvedName}>
|
||||
<Queryable
|
||||
query={`src.name == "${entry.src.name}"`}
|
||||
|
||||
@@ -10,14 +10,15 @@ export enum StatusCodeClassification {
|
||||
|
||||
interface EntryProps {
|
||||
statusCode: number
|
||||
statusQuery: string
|
||||
}
|
||||
|
||||
const StatusCode: React.FC<EntryProps> = ({statusCode}) => {
|
||||
const StatusCode: React.FC<EntryProps> = ({statusCode, statusQuery}) => {
|
||||
|
||||
const classification = getClassification(statusCode)
|
||||
|
||||
return <Queryable
|
||||
query={`response.status == ${statusCode}`}
|
||||
query={statusQuery}
|
||||
displayIconOnMouseOver={true}
|
||||
flipped={true}
|
||||
iconStyle={{marginTop: "40px", paddingLeft: "10px"}}
|
||||
|
||||
@@ -5,14 +5,16 @@ import Queryable from "./Queryable";
|
||||
|
||||
interface SummaryProps {
|
||||
method: string
|
||||
methodQuery: string
|
||||
summary: string
|
||||
summaryQuery: string
|
||||
}
|
||||
|
||||
export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
|
||||
export const Summary: React.FC<SummaryProps> = ({method, methodQuery, summary, summaryQuery}) => {
|
||||
|
||||
return <div className={styles.container}>
|
||||
{method && <Queryable
|
||||
query={`method == "${method}"`}
|
||||
query={methodQuery}
|
||||
className={`${miscStyles.protocol} ${miscStyles.method}`}
|
||||
displayIconOnMouseOver={true}
|
||||
style={{whiteSpace: "nowrap"}}
|
||||
@@ -24,7 +26,7 @@ export const Summary: React.FC<SummaryProps> = ({method, summary}) => {
|
||||
</span>
|
||||
</Queryable>}
|
||||
{summary && <Queryable
|
||||
query={`summary == "${summary}"`}
|
||||
query={summaryQuery}
|
||||
displayIconOnMouseOver={true}
|
||||
flipped={true}
|
||||
iconStyle={{zIndex:"5",position:"relative",right:"14px"}}
|
||||
|
||||
@@ -52,12 +52,12 @@ export default class Api {
|
||||
}
|
||||
|
||||
getEntry = async (id, query) => {
|
||||
const response = await this.client.get(`/entries/${id}?query=${query}`);
|
||||
const response = await this.client.get(`/entries/${id}?query=${encodeURIComponent(query)}`);
|
||||
return response.data;
|
||||
}
|
||||
|
||||
fetchEntries = async (leftOff, direction, query, limit, timeoutMs) => {
|
||||
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${query}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
|
||||
const response = await this.client.get(`/entries/?leftOff=${leftOff}&direction=${direction}&query=${encodeURIComponent(query)}&limit=${limit}&timeoutMs=${timeoutMs}`).catch(function (thrown) {
|
||||
console.error(thrown.message);
|
||||
return {};
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user