Compare commits

..

10 Commits

Author SHA1 Message Date
Nimrod Gilboa Markevich
e09748baff Fix spelling in docs (#745)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-01 17:59:43 +02:00
Adam Kol
20fcc8e163 Cypress: prettier code (#743)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-02-01 17:46:07 +02:00
Igor Gov
fd64c1bb14 Refactor mizuserver module rename (#742)
Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
2022-02-01 17:19:24 +02:00
Adam Kol
3a51ca21eb cypress body test fix (#741) 2022-02-01 15:12:01 +02:00
Igor Gov
0a2e55f7bc Fix lint errors file not found (#740)
Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
2022-02-01 14:17:56 +02:00
Adam Kol
f7c200b821 Cypress: first step of the sanity test (#727)
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2022-02-01 12:43:45 +02:00
M. Mert Yıldıran
0846a98bc1 Fix the Redis dissector (#739)
Co-authored-by: Igor Gov <iggvrv@gmail.com>
2022-02-01 13:24:56 +03:00
Igor Gov
602225bb36 Adding go lint to more modules (#738) 2022-02-01 12:08:55 +02:00
Adam Kol
c0f6f2a049 Cypress: Right side body test (#722) 2022-02-01 11:12:41 +02:00
Igor Gov
7846d812c1 Fix: minor error handling bug (#737)
Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
2022-02-01 09:21:17 +02:00
80 changed files with 466 additions and 1449 deletions

View File

@@ -15,6 +15,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '^1.16'
- name: Install dependencies
run: |
@@ -48,3 +51,40 @@ jobs:
version: latest
working-directory: cli
args: --timeout=3m
- name: Go lint - acceptanceTests
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: acceptanceTests
args: --timeout=3m
- name: Go lint - tap/api
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: tap/api
- name: Go lint - tap/extensions/amqp
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: tap/extensions/amqp
- name: Go lint - tap/extensions/http
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: tap/extensions/http
- name: Go lint - tap/extensions/kafka
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: tap/extensions/kafka
- name: Go lint - tap/extensions/redis
uses: golangci/golangci-lint-action@v2
with:
version: latest
working-directory: tap/extensions/redis

View File

@@ -72,10 +72,10 @@ ARG SEM_VER=0.0.0
WORKDIR /app/agent-build
RUN go build -ldflags="-extldflags=-static -s -w \
-X 'mizuserver/pkg/version.GitCommitHash=${COMMIT_HASH}' \
-X 'mizuserver/pkg/version.Branch=${GIT_BRANCH}' \
-X 'mizuserver/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
-X 'github.com/up9inc/mizu/agent/pkg/version.GitCommitHash=${COMMIT_HASH}' \
-X 'github.com/up9inc/mizu/agent/pkg/version.Branch=${GIT_BRANCH}' \
-X 'github.com/up9inc/mizu/agent/pkg/version.BuildTimestamp=${BUILD_TIMESTAMP}' \
-X 'github.com/up9inc/mizu/agent/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum
ADD https://github.com/up9inc/basenine/releases/download/v0.4.13/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH}

View File

@@ -4,7 +4,7 @@
"viewportHeight": 1080,
"video": false,
"screenshotOnRunFailure": false,
"defaultCommandTimeout": 5000,
"defaultCommandTimeout": 6000,
"testFiles": [
"tests/GuiPort.js",
"tests/MultipleNamespaces.js",

View File

@@ -3,6 +3,7 @@ import {resizeToHugeMizu, resizeToNormalMizu} from "../testHelpers/TrafficHelper
const greenFilterColor = 'rgb(210, 250, 210)';
const redFilterColor = 'rgb(250, 214, 220)';
const refreshWaitTimeout = 10000;
const bodyJsonClass = '.hljs';
it('opening mizu', function () {
cy.visit(Cypress.env('testUrl'));
@@ -25,6 +26,37 @@ it('filtering guide check', function () {
cy.get('#modal-modal-title').should('not.exist');
});
it('right side sanity test', function () {
cy.get('#entryDetailedTitleBodySize').then(sizeTopLine => {
const sizeOnTopLine = sizeTopLine.text().replace(' B', '');
cy.contains('Response').click();
cy.contains('Body Size (bytes)').parent().next().then(size => {
const bodySizeByDetails = size.text();
expect(sizeOnTopLine).to.equal(bodySizeByDetails, 'The body size in the top line should match the details in the response');
if (parseInt(bodySizeByDetails) < 0) {
throw new Error(`The body size cannot be negative. got the size: ${bodySizeByDetails}`)
}
cy.get('#entryDetailedTitleElapsedTime').then(timeInMs => {
const time = timeInMs.text();
if (time < '0ms') {
throw new Error(`The time in the top line cannot be negative ${time}`);
}
cy.get('#rightSideContainer [title="Status Code"]').then(status => {
const statusCode = status.text();
cy.contains('Status').parent().next().then(statusInDetails => {
const statusCodeInDetails = statusInDetails.text();
expect(statusCode).to.equal(statusCodeInDetails, 'The status code in the top line should match the status code in details');
});
});
});
});
});
});
checkIllegalFilter('invalid filter');
checkFilter({
@@ -161,11 +193,12 @@ function checkFilter(filterDetails){
if (!applyByEnter)
cy.get('[type="submit"]').click();
// only one entry in DOM after filtering, checking all four checks on it
// only one entry in DOM after filtering, checking all checks on it
leftTextCheck(totalEntries - 1, leftSidePath, leftSideExpectedText);
leftOnHoverCheck(totalEntries - 1, leftSidePath, name);
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, name);
checkRightSideResponseBody();
cy.get('[title="Fetch old records"]').click();
resizeToHugeMizu();
@@ -196,6 +229,7 @@ function deeperChcek(leftSidePath, rightSidePath, filterName, leftSideExpectedTe
cy.get(`#list #entry-${entryNum}`).click();
rightTextCheck(rightSidePath, rightSideExpectedText);
rightOnHoverCheck(rightSidePath, filterName);
checkRightSideResponseBody();
});
}
@@ -216,3 +250,88 @@ function rightOnHoverCheck(path, expectedText) {
cy.get(`.TrafficPage-Container > :nth-child(2) ${path}`).trigger('mouseover');
cy.get(`.TrafficPage-Container > :nth-child(2) .Queryable-Tooltip`).should('have.text', expectedText);
}
function checkRightSideResponseBody() {
cy.contains('Response').click();
clickCheckbox('Decode Base64');
cy.get(`${bodyJsonClass}`).then(value => {
const encodedBody = value.text();
cy.log(encodedBody);
const decodedBody = atob(encodedBody);
const responseBody = JSON.parse(decodedBody);
const expectdJsonBody = {
args: RegExp({}),
url: RegExp('http://.*/get'),
headers: {
"User-Agent": RegExp('[REDACTED]'),
"Accept-Encoding": RegExp('gzip'),
"X-Forwarded-Uri": RegExp('/api/v1/namespaces/.*/services/.*/proxy/get')
}
};
expect(responseBody.args).to.match(expectdJsonBody.args);
expect(responseBody.url).to.match(expectdJsonBody.url);
expect(responseBody.headers['User-Agent']).to.match(expectdJsonBody.headers['User-Agent']);
expect(responseBody.headers['Accept-Encoding']).to.match(expectdJsonBody.headers['Accept-Encoding']);
expect(responseBody.headers['X-Forwarded-Uri']).to.match(expectdJsonBody.headers['X-Forwarded-Uri']);
cy.get(`${bodyJsonClass}`).should('have.text', encodedBody);
clickCheckbox('Decode Base64');
cy.get(`${bodyJsonClass} > `).its('length').should('be.gt', 1).then(linesNum => {
cy.get(`${bodyJsonClass} > >`).its('length').should('be.gt', linesNum).then(jsonItemsNum => {
checkPrettyAndLineNums(jsonItemsNum, decodedBody);
clickCheckbox('Line numbers');
checkPrettyOrNothing(jsonItemsNum, decodedBody);
clickCheckbox('Pretty');
checkPrettyOrNothing(jsonItemsNum, decodedBody);
clickCheckbox('Line numbers');
checkOnlyLineNumberes(jsonItemsNum, decodedBody);
});
});
});
}
function clickCheckbox(type) {
cy.contains(`${type}`).prev().children().click();
}
function checkPrettyAndLineNums(jsonItemsLen, decodedBody) {
decodedBody = decodedBody.replaceAll(' ', '');
cy.get(`${bodyJsonClass} >`).then(elements => {
const lines = Object.values(elements);
lines.forEach((line, index) => {
if (line.getAttribute) {
const cleanLine = getCleanLine(line);
const currentLineFromDecodedText = decodedBody.substring(0, cleanLine.length);
expect(cleanLine).to.equal(currentLineFromDecodedText, `expected the text in line number ${index + 1} to match the text that generated by the base64 decoding`)
decodedBody = decodedBody.substring(cleanLine.length);
}
});
});
}
function getCleanLine(lineElement) {
return (lineElement.innerText.substring(0, lineElement.innerText.length - 1)).replaceAll(' ', '');
}
function checkPrettyOrNothing(jsonItems, decodedBody) {
cy.get(`${bodyJsonClass} > `).should('have.length', jsonItems).then(text => {
const json = text.text();
expect(json).to.equal(decodedBody);
});
}
function checkOnlyLineNumberes(jsonItems, decodedText) {
cy.get(`${bodyJsonClass} >`).should('have.length', 1).and('have.text', decodedText);
cy.get(`${bodyJsonClass} > >`).should('have.length', jsonItems)
}

View File

@@ -3,7 +3,6 @@ package acceptanceTests
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
@@ -11,12 +10,10 @@ import (
"os/exec"
"path"
"strings"
"sync"
"syscall"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
)
@@ -28,7 +25,6 @@ const (
defaultServiceName = "httpbin"
defaultEntriesCount = 50
waitAfterTapPodsReady = 3 * time.Second
cleanCommandTimeout = 1 * time.Minute
)
type PodDescriptor struct {
@@ -36,18 +32,6 @@ type PodDescriptor struct {
Namespace string
}
func isPodDescriptorInPodArray(pods []map[string]interface{}, podDescriptor PodDescriptor) bool {
for _, pod := range pods {
podNamespace := pod["namespace"].(string)
podName := pod["name"].(string)
if podDescriptor.Namespace == podNamespace && strings.Contains(podName, podDescriptor.Name) {
return true
}
}
return false
}
func getCliPath() (string, error) {
dir, filePathErr := os.Getwd()
if filePathErr != nil {
@@ -84,10 +68,6 @@ func getApiServerUrl(port uint16) string {
return fmt.Sprintf("http://localhost:%v", port)
}
func getWebSocketUrl(port uint16) string {
return fmt.Sprintf("ws://localhost:%v/ws", port)
}
func getDefaultCommandArgs() []string {
setFlag := "--set"
telemetry := "telemetry=false"
@@ -130,20 +110,6 @@ func getDefaultConfigCommandArgs() []string {
return append([]string{configCommand}, defaultCmdArgs...)
}
func getDefaultCleanCommandArgs() []string {
cleanCommand := "clean"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{cleanCommand}, defaultCmdArgs...)
}
func getDefaultViewCommandArgs() []string {
viewCommand := "view"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{viewCommand}, defaultCmdArgs...)
}
func runCypressTests(t *testing.T, cypressRunCmd string) {
cypressCmd := exec.Command("bash", "-c", cypressRunCmd)
t.Logf("running command: %v", cypressCmd.String())
@@ -268,36 +234,6 @@ func executeHttpPostRequestWithHeaders(url string, headers map[string]string, bo
return executeHttpRequest(response, requestErr)
}
func runMizuClean() error {
cliPath, err := getCliPath()
if err != nil {
return err
}
cleanCmdArgs := getDefaultCleanCommandArgs()
cleanCmd := exec.Command(cliPath, cleanCmdArgs...)
commandDone := make(chan error)
go func() {
if err := cleanCmd.Run(); err != nil {
commandDone <- err
}
commandDone <- nil
}()
select {
case err = <-commandDone:
if err != nil {
return err
}
case <-time.After(cleanCommandTimeout):
return errors.New("clean command timed out")
}
return nil
}
func cleanupCommand(cmd *exec.Cmd) error {
if err := cmd.Process.Signal(syscall.SIGQUIT); err != nil {
return err
@@ -310,17 +246,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
return nil
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapPodsInterface := tapStatusInterface.([]interface{})
var pods []map[string]interface{}
for _, podInterface := range tapPodsInterface {
pods = append(pods, podInterface.(map[string]interface{}))
}
return pods, nil
}
func getLogsPath() (string, error) {
dir, filePathErr := os.Getwd()
if filePathErr != nil {
@@ -331,77 +256,6 @@ func getLogsPath() (string, error) {
return logsPath, nil
}
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
channel := make(chan struct{})
go func() {
defer close(channel)
wg.Wait()
}()
select {
case <-channel:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
// checkEntriesAtLeast checks whether the number of entries greater than or equal to n
func checkEntriesAtLeast(entries []map[string]interface{}, n int) error {
if len(entries) < n {
return fmt.Errorf("Unexpected entries result - Expected more than %d entries", n-1)
}
return nil
}
// getDBEntries retrieves the entries from the database before the given timestamp.
// Also limits the results according to the limit parameter.
// Timeout for the WebSocket connection is defined by the timeout parameter.
func getDBEntries(timestamp int64, limit int, timeout time.Duration) (entries []map[string]interface{}, err error) {
query := fmt.Sprintf("timestamp < %d and limit(%d)", timestamp, limit)
webSocketUrl := getWebSocketUrl(defaultApiServerPort)
var connection *websocket.Conn
connection, _, err = websocket.DefaultDialer.Dial(webSocketUrl, nil)
if err != nil {
return
}
defer connection.Close()
handleWSConnection := func(wg *sync.WaitGroup) {
defer wg.Done()
for {
_, message, err := connection.ReadMessage()
if err != nil {
return
}
var data map[string]interface{}
if err = json.Unmarshal([]byte(message), &data); err != nil {
return
}
if data["messageType"] == "entry" {
entries = append(entries, data)
}
}
}
err = connection.WriteMessage(websocket.TextMessage, []byte(query))
if err != nil {
return
}
var wg sync.WaitGroup
go handleWSConnection(&wg)
wg.Add(1)
waitTimeout(&wg, timeout)
return
}
func Contains(slice []string, containsValue string) bool {
for _, sliceValue := range slice {
if sliceValue == containsValue {

View File

@@ -1,4 +1,4 @@
module mizuserver
module github.com/up9inc/mizu/agent
go 1.16

View File

@@ -6,17 +6,6 @@ import (
"flag"
"fmt"
"io/ioutil"
"mizuserver/pkg/api"
"mizuserver/pkg/config"
"mizuserver/pkg/controllers"
"mizuserver/pkg/elastic"
"mizuserver/pkg/middlewares"
"mizuserver/pkg/models"
"mizuserver/pkg/oas"
"mizuserver/pkg/routes"
"mizuserver/pkg/servicemap"
"mizuserver/pkg/up9"
"mizuserver/pkg/utils"
"net/http"
"os"
"os/signal"
@@ -26,6 +15,21 @@ import (
"syscall"
"time"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/routes"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/config"
v1 "k8s.io/api/core/v1"
"github.com/antelman107/net-wait-go/wait"

View File

@@ -5,22 +5,23 @@ import (
"context"
"encoding/json"
"fmt"
"mizuserver/pkg/elastic"
"mizuserver/pkg/har"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"os"
"path"
"sort"
"strings"
"time"
"mizuserver/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/holder"
"github.com/up9inc/mizu/agent/pkg/providers"
"mizuserver/pkg/models"
"mizuserver/pkg/oas"
"mizuserver/pkg/resolver"
"mizuserver/pkg/utils"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/resolver"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"

View File

@@ -3,11 +3,12 @@ package api
import (
"encoding/json"
"fmt"
"mizuserver/pkg/models"
"net/http"
"sync"
"time"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
basenine "github.com/up9inc/basenine/client/go"
@@ -40,7 +41,7 @@ var connectedWebsocketIdCounter = 0
func init() {
websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket
connectedWebsockets = make(map[int]*SocketConnection, 0)
connectedWebsockets = make(map[int]*SocketConnection)
}
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {

View File

@@ -3,12 +3,13 @@ package api
import (
"encoding/json"
"fmt"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/providers/tappers"
"mizuserver/pkg/up9"
"sync"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/up9"
tapApi "github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/shared"

View File

@@ -2,21 +2,22 @@ package controllers
import (
"context"
"net/http"
"regexp"
"time"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/providers/tapConfig"
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/kubernetes"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
v1 "k8s.io/api/core/v1"
"mizuserver/pkg/config"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/providers/tapConfig"
"mizuserver/pkg/providers/tappedPods"
"mizuserver/pkg/providers/tappers"
"net/http"
"regexp"
"time"
)
var cancelTapperSyncer context.CancelFunc

View File

@@ -2,13 +2,14 @@ package controllers
import (
"encoding/json"
"mizuserver/pkg/har"
"mizuserver/pkg/models"
"mizuserver/pkg/validation"
"net/http"
"strconv"
"time"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/validation"
"github.com/gin-gonic/gin"
basenine "github.com/up9inc/basenine/client/go"

View File

@@ -1,9 +1,10 @@
package controllers
import (
"mizuserver/pkg/providers"
"net/http"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared/logger"
)

View File

@@ -1,10 +1,11 @@
package controllers
import (
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/version"
"net/http"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/version"
"github.com/up9inc/mizu/shared"
)
func GetVersion(c *gin.Context) {

View File

@@ -1,11 +1,12 @@
package controllers
import (
"net/http"
"github.com/chanced/openapi"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/oas"
"net/http"
)
func GetOASServers(c *gin.Context) {

View File

@@ -1,10 +1,11 @@
package controllers
import (
"mizuserver/pkg/oas"
"net/http/httptest"
"testing"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/gin-gonic/gin"
)

View File

@@ -1,9 +1,10 @@
package controllers
import (
"mizuserver/pkg/servicemap"
"net/http"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/gin-gonic/gin"
)

View File

@@ -7,7 +7,7 @@ import (
"net/http/httptest"
"testing"
"mizuserver/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/suite"

View File

@@ -2,17 +2,18 @@ package controllers
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/holder"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/providers/tappedPods"
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/validation"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/api"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"mizuserver/pkg/providers/tappedPods"
"mizuserver/pkg/providers/tappers"
"mizuserver/pkg/up9"
"mizuserver/pkg/validation"
"net/http"
)
func HealthCheck(c *gin.Context) {

View File

@@ -1,7 +1,7 @@
package controllers
import (
"mizuserver/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared/logger"

View File

@@ -1,6 +1,6 @@
package holder
import "mizuserver/pkg/resolver"
import "github.com/up9inc/mizu/agent/pkg/resolver"
var k8sResolver *resolver.Resolver
@@ -11,4 +11,3 @@ func SetResolver(param *resolver.Resolver) {
func GetResolver() *resolver.Resolver {
return k8sResolver
}

View File

@@ -1,8 +1,8 @@
package middlewares
import (
"mizuserver/pkg/config"
"mizuserver/pkg/providers"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/providers"
"github.com/gin-gonic/gin"
ory "github.com/ory/kratos-client-go"

View File

@@ -2,9 +2,10 @@ package models
import (
"encoding/json"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/rules"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/har"
"mizuserver/pkg/rules"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared"

View File

@@ -6,13 +6,14 @@ import (
"errors"
"io"
"io/ioutil"
"mizuserver/pkg/har"
"os"
"path/filepath"
"sort"
"strings"
"testing"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
)

View File

@@ -3,10 +3,11 @@ package oas
import (
"context"
"encoding/json"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/har"
"net/url"
"sync"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
)
var (

View File

@@ -4,12 +4,13 @@ import (
"encoding/json"
"errors"
"mime"
"mizuserver/pkg/har"
"net/url"
"strconv"
"strings"
"sync"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/chanced/openapi"
"github.com/google/uuid"
"github.com/up9inc/mizu/shared/logger"

View File

@@ -2,15 +2,16 @@ package oas
import (
"encoding/json"
"github.com/chanced/openapi"
"github.com/op/go-logging"
"github.com/up9inc/mizu/shared/logger"
"io/ioutil"
"mizuserver/pkg/har"
"os"
"strings"
"testing"
"time"
"github.com/chanced/openapi"
"github.com/op/go-logging"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
)
// if started via env, write file into subdir

View File

@@ -3,10 +3,11 @@ package oas
import (
"encoding/json"
"errors"
"mizuserver/pkg/har"
"strconv"
"strings"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/chanced/openapi"
"github.com/up9inc/mizu/shared/logger"
)

View File

@@ -3,7 +3,8 @@ package providers
import (
"context"
"errors"
"mizuserver/pkg/config"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/shared/logger"

View File

@@ -2,8 +2,9 @@ package providers_test
import (
"fmt"
"mizuserver/pkg/providers"
"testing"
"github.com/up9inc/mizu/agent/pkg/providers"
)
func TestNoEntryAddedCount(t *testing.T) {

View File

@@ -3,12 +3,13 @@ package providers
import (
"encoding/json"
"fmt"
"github.com/patrickmn/go-cache"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/models"
"os"
"time"
"github.com/patrickmn/go-cache"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
)
const tlsLinkRetainmentTime = time.Minute * 15

View File

@@ -1,12 +1,13 @@
package tapConfig
import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"os"
"sync"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const FilePath = shared.DataDirPath + "tap-config.json"

View File

@@ -1,13 +1,14 @@
package tappedPods
import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/providers/tappers"
"mizuserver/pkg/utils"
"os"
"strings"
"sync"
"github.com/up9inc/mizu/agent/pkg/providers/tappers"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const FilePath = shared.DataDirPath + "tapped-pods.json"

View File

@@ -1,11 +1,12 @@
package tappers
import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/utils"
"os"
"sync"
"github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
const FilePath = shared.DataDirPath + "tappers-status.json"

View File

@@ -15,7 +15,8 @@ more on keto here: https://www.ory.sh/keto/docs/
import (
"fmt"
"mizuserver/pkg/utils"
"github.com/up9inc/mizu/agent/pkg/utils"
ketoClient "github.com/ory/keto-client-go/client"
ketoRead "github.com/ory/keto-client-go/client/read"

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,7 +1,7 @@
package routes
import (
"mizuserver/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/gin-gonic/gin"
)

View File

@@ -1,7 +1,7 @@
package routes
import (
"mizuserver/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/gin-gonic/gin"
)

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,8 +1,8 @@
package routes
import (
"mizuserver/pkg/controllers"
"mizuserver/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/gin-gonic/gin"
)

View File

@@ -1,7 +1,7 @@
package routes
import (
"mizuserver/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/gin-gonic/gin"
)

View File

@@ -4,11 +4,12 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"mizuserver/pkg/har"
"reflect"
"regexp"
"strings"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared"

View File

@@ -6,8 +6,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"mizuserver/pkg/har"
"mizuserver/pkg/utils"
"net/http"
"net/url"
"regexp"
@@ -15,6 +13,9 @@ import (
"sync"
"time"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/utils"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"

View File

@@ -33,8 +33,12 @@ func StartServer(app *gin.Engine) {
go func() {
<-signals
logger.Log.Infof("Shutting down...")
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) //nolint
_ = srv.Shutdown(ctx)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := srv.Shutdown(ctx)
if err != nil {
logger.Log.Errorf("%v", err)
}
os.Exit(0)
}()

View File

@@ -4,7 +4,7 @@
This document describes in details all permissions required for full and correct operation of Mizu.
## Editting permissions
## Editing permissions
During installation, Mizu creates a `ServiceAccount` and the roles it requires. No further action is required.
However, if there is a need, it is possible to make changes to Mizu permissions.

View File

@@ -764,7 +764,7 @@ func (provider *Provider) handleRemovalError(err error) error {
}
func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string, configMapName string, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
configMapData := make(map[string]string, 0)
configMapData := make(map[string]string)
if serializedValidationRules != "" {
configMapData[shared.ValidationRulesFileName] = serializedValidationRules
}

View File

@@ -10,7 +10,7 @@ import (
)
func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) map[string][]core.Pod {
nodeToTappedPodMap := make(map[string][]core.Pod, 0)
nodeToTappedPodMap := make(map[string][]core.Pod)
for _, pod := range tappedPods {
minimizedPod := getMinimizedPod(pod)

View File

@@ -6,11 +6,12 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/google/martian/har"
"io/ioutil"
"net/http"
"sync"
"time"
"github.com/google/martian/har"
)
type Protocol struct {
@@ -282,7 +283,7 @@ func (h HTTPPayload) MarshalJSON() ([]byte, error) {
RawResponse: &HTTPResponseWrapper{Response: h.Data.(*http.Response)},
})
default:
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %s", h.Type))
panic(fmt.Sprintf("HTTP payload cannot be marshaled: %v", h.Type))
}
}
@@ -312,7 +313,7 @@ type HTTPRequestWrapper struct {
func (r *HTTPRequestWrapper) MarshalJSON() ([]byte, error) {
body, _ := ioutil.ReadAll(r.Request.Body)
r.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
return json.Marshal(&struct {
return json.Marshal(&struct { //nolint
Body string `json:"Body,omitempty"`
GetBody string `json:"GetBody,omitempty"`
Cancel string `json:"Cancel,omitempty"`
@@ -330,7 +331,7 @@ type HTTPResponseWrapper struct {
func (r *HTTPResponseWrapper) MarshalJSON() ([]byte, error) {
body, _ := ioutil.ReadAll(r.Response.Body)
r.Response.Body = ioutil.NopCloser(bytes.NewBuffer(body))
return json.Marshal(&struct {
return json.Marshal(&struct { //nolint
Body string `json:"Body,omitempty"`
GetBody string `json:"GetBody,omitempty"`
Cancel string `json:"Cancel,omitempty"`

View File

@@ -24,14 +24,14 @@ var connectionMethodMap = map[int]string{
61: "connection unblocked",
}
var channelMethodMap = map[int]string{
10: "channel open",
11: "channel open-ok",
20: "channel flow",
21: "channel flow-ok",
40: "channel close",
41: "channel close-ok",
}
// var channelMethodMap = map[int]string{
// 10: "channel open",
// 11: "channel open-ok",
// 20: "channel flow",
// 21: "channel flow-ok",
// 40: "channel close",
// 41: "channel close-ok",
// }
var exchangeMethodMap = map[int]string{
10: "exchange declare",
@@ -78,14 +78,14 @@ var basicMethodMap = map[int]string{
120: "basic nack",
}
var txMethodMap = map[int]string{
10: "tx select",
11: "tx select-ok",
20: "tx commit",
21: "tx commit-ok",
30: "tx rollback",
31: "tx rollback-ok",
}
// var txMethodMap = map[int]string{
// 10: "tx select",
// 11: "tx select-ok",
// 20: "tx commit",
// 21: "tx commit-ok",
// 30: "tx rollback",
// 31: "tx rollback-ok",
// }
type AMQPWrapper struct {
Method string `json:"method"`
@@ -550,14 +550,12 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
headers := make([]api.TableData, 0)
for name, value := range event["serverProperties"].(map[string]interface{}) {
var outcome string
switch value.(type) {
switch v := value.(type) {
case string:
outcome = value.(string)
break
outcome = v
case map[string]interface{}:
x, _ := json.Marshal(value)
outcome = string(x)
break
default:
panic("Unknown data type for the server property!")
}

View File

@@ -47,7 +47,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
var remaining int
var header *HeaderFrame
var body []byte
connectionInfo := &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -102,13 +101,10 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
eventBasicPublish.Properties = header.Properties
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
// continue until terminated
body = append(body, f.Body...)
remaining -= len(f.Body)
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
@@ -119,9 +115,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
eventBasicDeliver.Body = f.Body
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
@@ -211,10 +204,6 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}
default:
@@ -231,32 +220,24 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
switch request["method"] {
case basicMethodMap[40]:
summary = reqDetails["exchange"].(string)
break
case basicMethodMap[60]:
summary = reqDetails["exchange"].(string)
break
case exchangeMethodMap[10]:
summary = reqDetails["exchange"].(string)
break
case queueMethodMap[10]:
summary = reqDetails["queue"].(string)
break
case connectionMethodMap[10]:
summary = fmt.Sprintf(
"%s.%s",
strconv.Itoa(int(reqDetails["versionMajor"].(float64))),
strconv.Itoa(int(reqDetails["versionMinor"].(float64))),
)
break
case connectionMethodMap[50]:
summary = reqDetails["replyText"].(string)
break
case queueMethodMap[20]:
summary = reqDetails["queue"].(string)
break
case basicMethodMap[20]:
summary = reqDetails["queue"].(string)
break
}
request["url"] = summary
@@ -288,33 +269,25 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
representation := make(map[string]interface{})
var repRequest []interface{}
switch request["method"].(string) {
case basicMethodMap[40]:
repRequest = representBasicPublish(request)
break
case basicMethodMap[60]:
repRequest = representBasicDeliver(request)
break
case queueMethodMap[10]:
repRequest = representQueueDeclare(request)
break
case exchangeMethodMap[10]:
repRequest = representExchangeDeclare(request)
break
case connectionMethodMap[10]:
repRequest = representConnectionStart(request)
break
case connectionMethodMap[50]:
repRequest = representConnectionClose(request)
break
case queueMethodMap[20]:
repRequest = representQueueBind(request)
break
case basicMethodMap[20]:
repRequest = representBasicConsume(request)
break
}
representation["request"] = repRequest
object, err = json.Marshal(representation)

View File

@@ -135,20 +135,6 @@ func readDecimal(r io.Reader) (v Decimal, err error) {
return
}
func readFloat32(r io.Reader) (v float32, err error) {
if err = binary.Read(r, binary.BigEndian, &v); err != nil {
return
}
return
}
func readFloat64(r io.Reader) (v float64, err error) {
if err = binary.Read(r, binary.BigEndian, &v); err != nil {
return
}
return
}
func readTimestamp(r io.Reader) (v time.Time, err error) {
var sec int64
if err = binary.Read(r, binary.BigEndian, &sec); err != nil {

View File

@@ -78,15 +78,6 @@ type Error struct {
Recover bool // true when this error can be recovered by retrying later or with different parameters
}
func newError(code uint16, text string) *Error {
return &Error{
Code: int(code),
Reason: text,
Recover: isSoftExceptionCode(int(code)),
Server: true,
}
}
func (e Error) Error() string {
return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
}
@@ -262,19 +253,6 @@ func (t Table) Validate() error {
return validateField(t)
}
// Heap interface for maintaining delivery tags
type tagSet []uint64
func (set tagSet) Len() int { return len(set) }
func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] }
func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] }
func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
func (set *tagSet) Pop() interface{} {
val := (*set)[len(*set)-1]
*set = (*set)[:len(*set)-1]
return val
}
type Message interface {
id() (uint16, uint16)
wait() bool
@@ -282,12 +260,6 @@ type Message interface {
write(io.Writer) error
}
type messageWithContent interface {
Message
getContent() (Properties, []byte)
setContent(Properties, []byte)
}
/*
The base interface implemented as:
@@ -322,22 +294,6 @@ type AmqpReader struct {
R io.Reader
}
type writer struct {
w io.Writer
}
// Implements the frame interface for Connection RPC
type protocolHeader struct{}
func (protocolHeader) write(w io.Writer) error {
_, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
return err
}
func (protocolHeader) channel() uint16 {
panic("only valid as initial handshake")
}
/*
Method frames carry the high-level protocol commands (which we call "methods").
One method frame carries one command. The method frame payload has this format:

View File

@@ -6,7 +6,6 @@
package amqp
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
@@ -15,18 +14,6 @@ import (
"time"
)
func (w *writer) WriteFrame(frame frame) (err error) {
if err = frame.write(w.w); err != nil {
return
}
if buf, ok := w.w.(*bufio.Writer); ok {
err = buf.Flush()
}
return
}
func (f *MethodFrame) write(w io.Writer) (err error) {
var payload bytes.Buffer
@@ -412,5 +399,5 @@ func writeTable(w io.Writer, table Table) (err error) {
}
}
return writeLongstr(w, string(buf.Bytes()))
return writeLongstr(w, buf.String())
}

View File

@@ -81,7 +81,7 @@ func representSliceAsTable(slice []interface{}, selectorPrefix string) (represen
selector := fmt.Sprintf("%s[%d]", selectorPrefix, i)
table = append(table, api.TableData{
Name: strconv.Itoa(i),
Value: item.(interface{}),
Value: item,
Selector: selector,
})
}

View File

@@ -92,11 +92,15 @@ func (d dissecting) Ping() {
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter, options *api.TrafficFilteringOptions) error {
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
var err error
isHTTP2, _ := checkIsHTTP2Connection(b, isClient)
var http2Assembler *Http2Assembler
if isHTTP2 {
prepareHTTP2Connection(b, isClient)
err = prepareHTTP2Connection(b, isClient)
if err != nil {
return err
}
http2Assembler = createHTTP2Assembler(b)
}
@@ -105,7 +109,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
if switchingProtocolsHTTP2 {
switchingProtocolsHTTP2 = false
isHTTP2, err = checkIsHTTP2Connection(b, isClient)
prepareHTTP2Connection(b, isClient)
if err != nil {
break
}
err = prepareHTTP2Connection(b, isClient)
if err != nil {
break
}
http2Assembler = createHTTP2Assembler(b)
}
@@ -340,11 +350,11 @@ func representRequest(request map[string]interface{}) (repRequest []interface{})
})
postData, _ := request["postData"].(map[string]interface{})
mimeType, _ := postData["mimeType"]
mimeType := postData["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
text, _ := postData["text"]
text := postData["text"]
if text != nil {
repRequest = append(repRequest, api.SectionData{
Type: api.BODY,
@@ -424,12 +434,12 @@ func representResponse(response map[string]interface{}) (repResponse []interface
})
content, _ := response["content"].(map[string]interface{})
mimeType, _ := content["mimeType"]
mimeType := content["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
encoding, _ := content["encoding"]
text, _ := content["text"]
encoding := content["encoding"]
text := content["text"]
if text != nil {
repResponse = append(repResponse, api.SectionData{
Type: api.BODY,
@@ -445,7 +455,7 @@ func representResponse(response map[string]interface{}) (repResponse []interface
}
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
representation := make(map[string]interface{}, 0)
representation := make(map[string]interface{})
repRequest := representRequest(request)
repResponse, bodySize := representResponse(response)
representation["request"] = repRequest

View File

@@ -89,7 +89,7 @@ func filterResponseBody(response *http.Response, options *api.TrafficFilteringOp
}
func filterHeaders(headers *http.Header) {
for key, _ := range *headers {
for key := range *headers {
if strings.ToLower(key) == userAgent {
continue
}
@@ -103,7 +103,7 @@ func filterHeaders(headers *http.Header) {
}
func getContentTypeHeaderValue(headers http.Header) string {
for key, _ := range headers {
for key := range headers {
if strings.ToLower(key) == "content-type" {
return headers.Get(key)
}

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"io"
"math"
"sync"
"sync/atomic"
)
@@ -201,25 +200,6 @@ func newPageBuffer() *pageBuffer {
return b
}
func (pb *pageBuffer) refTo(ref *pageRef, begin, end int64) {
length := end - begin
if length > math.MaxUint32 {
panic("reference to contiguous buffer pages exceeds the maximum size of 4 GB")
}
ref.pages = append(ref.buffer[:0], pb.pages.slice(begin, end)...)
ref.pages.ref()
ref.offset = begin
ref.length = uint32(length)
}
func (pb *pageBuffer) ref(begin, end int64) *pageRef {
ref := new(pageRef)
pb.refTo(ref, begin, end)
return ref
}
func (pb *pageBuffer) unref() {
pb.refc.unref(func() {
pb.pages.unref()
@@ -353,12 +333,12 @@ func (pb *pageBuffer) Write(b []byte) (int, error) {
free := tail.Cap() - tail.Len()
if len(b) <= free {
tail.Write(b)
_, _ = tail.Write(b)
pb.length += len(b)
break
}
tail.Write(b[:free])
_, _ = tail.Write(b[:free])
b = b[free:]
pb.length += free
@@ -374,7 +354,7 @@ func (pb *pageBuffer) WriteAt(b []byte, off int64) (int, error) {
return n, err
}
if n < len(b) {
pb.Write(b[n:])
_, _ = pb.Write(b[n:])
}
return len(b), nil
}
@@ -406,12 +386,6 @@ var (
type contiguousPages []*page
func (pages contiguousPages) ref() {
for _, p := range pages {
p.ref()
}
}
func (pages contiguousPages) unref() {
for _, p := range pages {
p.unref()
@@ -480,7 +454,6 @@ var (
)
type pageRef struct {
buffer [2]*page
pages contiguousPages
offset int64
cursor int64
@@ -590,28 +563,6 @@ var (
_ io.WriterTo = (*pageRef)(nil)
)
type pageRefAllocator struct {
refs []pageRef
head int
size int
}
func (a *pageRefAllocator) newPageRef() *pageRef {
if a.head == len(a.refs) {
a.refs = make([]pageRef, a.size)
a.head = 0
}
ref := &a.refs[a.head]
a.head++
return ref
}
func unref(x interface{}) {
if r, _ := x.(interface{ unref() }); r != nil {
r.unref()
}
}
func seek(cursor, limit, offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
@@ -631,15 +582,3 @@ func seek(cursor, limit, offset int64, whence int) (int64, error) {
}
return offset, nil
}
func closeBytes(b Bytes) {
if b != nil {
b.Close()
}
}
func resetBytes(b Bytes) {
if r, _ := b.(interface{ Reset() }); r != nil {
r.Reset()
}
}

View File

@@ -1,8 +1,6 @@
package kafka
import (
"errors"
"github.com/segmentio/kafka-go/compress"
)
@@ -12,19 +10,3 @@ type CompressionCodec = compress.Codec
// TODO: this file should probably go away once the internals of the package
// have moved to use the protocol package.
const (
compressionCodecMask = 0x07
)
var (
errUnknownCodec = errors.New("the compression code is invalid or its codec has not been imported")
)
// resolveCodec looks up a codec by Code()
func resolveCodec(code int8) (CompressionCodec, error) {
codec := compress.Compression(code).Codec()
if codec == nil {
return nil, errUnknownCodec
}
return codec, nil
}

View File

@@ -58,14 +58,6 @@ func (d *decoder) ReadByte() (byte, error) {
return c, d.err
}
func (d *decoder) done() bool {
return d.remain == 0 || d.err != nil
}
func (d *decoder) setCRC(table *crc32.Table) {
d.table, d.crc32 = table, 0
}
func (d *decoder) decodeBool(v value) {
v.setBool(d.readBool())
}
@@ -199,19 +191,6 @@ func (d *decoder) read(n int) []byte {
return b
}
func (d *decoder) writeTo(w io.Writer, n int) {
limit := d.remain
if n < limit {
d.remain = n
}
c, err := io.Copy(w, d)
if int(c) < n && err == nil {
err = io.ErrUnexpectedEOF
}
d.remain = limit - int(c)
d.setError(err)
}
func (d *decoder) setError(err error) {
if d.err == nil && err != nil {
d.err = err
@@ -272,14 +251,6 @@ func (d *decoder) readString() string {
}
}
func (d *decoder) readVarString() string {
if n := d.readVarInt(); n < 0 {
return ""
} else {
return bytesToString(d.read(int(n)))
}
}
func (d *decoder) readCompactString() string {
if n := d.readUnsignedVarInt(); n < 1 {
return ""
@@ -296,32 +267,6 @@ func (d *decoder) readBytes() []byte {
}
}
func (d *decoder) readBytesTo(w io.Writer) bool {
if n := d.readInt32(); n < 0 {
return false
} else {
d.writeTo(w, int(n))
return d.err == nil
}
}
func (d *decoder) readVarBytes() []byte {
if n := d.readVarInt(); n < 0 {
return nil
} else {
return d.read(int(n))
}
}
func (d *decoder) readVarBytesTo(w io.Writer) bool {
if n := d.readVarInt(); n < 0 {
return false
} else {
d.writeTo(w, int(n))
return d.err == nil
}
}
func (d *decoder) readCompactBytes() []byte {
if n := d.readUnsignedVarInt(); n < 1 {
return nil
@@ -330,15 +275,6 @@ func (d *decoder) readCompactBytes() []byte {
}
}
func (d *decoder) readCompactBytesTo(w io.Writer) bool {
if n := d.readUnsignedVarInt(); n < 1 {
return false
} else {
d.writeTo(w, int(n-1))
return d.err == nil
}
}
func (d *decoder) readVarInt() int64 {
n := 11 // varints are at most 11 bytes

View File

@@ -14,37 +14,3 @@ func discardN(r *bufio.Reader, sz int, n int) (int, error) {
}
return sz - n, err
}
func discardInt8(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 1)
}
func discardInt16(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 2)
}
func discardInt32(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 4)
}
func discardInt64(r *bufio.Reader, sz int) (int, error) {
return discardN(r, sz, 8)
}
func discardString(r *bufio.Reader, sz int) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) {
if n < 0 {
return sz, nil
}
return discardN(r, sz, n)
})
}
func discardBytes(r *bufio.Reader, sz int) (int, error) {
return readBytesWith(r, sz, func(r *bufio.Reader, sz int, n int) (int, error) {
if n < 0 {
return sz, nil
}
return discardN(r, sz, n)
})
}

View File

@@ -3,7 +3,6 @@ package kafka
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"reflect"
@@ -95,10 +94,6 @@ func (e *encoder) WriteString(s string) (int, error) {
return n, nil
}
func (e *encoder) setCRC(table *crc32.Table) {
e.table, e.crc32 = table, 0
}
func (e *encoder) update(b []byte) {
if e.table != nil {
e.crc32 = crc32.Update(e.crc32, e.table, b)
@@ -133,10 +128,6 @@ func (e *encoder) encodeString(v value) {
e.writeString(v.string())
}
func (e *encoder) encodeVarString(v value) {
e.writeVarString(v.string())
}
func (e *encoder) encodeCompactString(v value) {
e.writeCompactString(v.string())
}
@@ -145,10 +136,6 @@ func (e *encoder) encodeNullString(v value) {
e.writeNullString(v.string())
}
func (e *encoder) encodeVarNullString(v value) {
e.writeVarNullString(v.string())
}
func (e *encoder) encodeCompactNullString(v value) {
e.writeCompactNullString(v.string())
}
@@ -157,10 +144,6 @@ func (e *encoder) encodeBytes(v value) {
e.writeBytes(v.bytes())
}
func (e *encoder) encodeVarBytes(v value) {
e.writeVarBytes(v.bytes())
}
func (e *encoder) encodeCompactBytes(v value) {
e.writeCompactBytes(v.bytes())
}
@@ -169,10 +152,6 @@ func (e *encoder) encodeNullBytes(v value) {
e.writeNullBytes(v.bytes())
}
func (e *encoder) encodeVarNullBytes(v value) {
e.writeVarNullBytes(v.bytes())
}
func (e *encoder) encodeCompactNullBytes(v value) {
e.writeCompactNullBytes(v.bytes())
}
@@ -228,37 +207,32 @@ func (e *encoder) encodeCompactNullArray(v value, elemType reflect.Type, encodeE
func (e *encoder) writeInt8(i int8) {
writeInt8(e.buffer[:1], i)
e.Write(e.buffer[:1])
_, _ = e.Write(e.buffer[:1])
}
func (e *encoder) writeInt16(i int16) {
writeInt16(e.buffer[:2], i)
e.Write(e.buffer[:2])
_, _ = e.Write(e.buffer[:2])
}
func (e *encoder) writeInt32(i int32) {
writeInt32(e.buffer[:4], i)
e.Write(e.buffer[:4])
_, _ = e.Write(e.buffer[:4])
}
func (e *encoder) writeInt64(i int64) {
writeInt64(e.buffer[:8], i)
e.Write(e.buffer[:8])
_, _ = e.Write(e.buffer[:8])
}
func (e *encoder) writeString(s string) {
e.writeInt16(int16(len(s)))
e.WriteString(s)
}
func (e *encoder) writeVarString(s string) {
e.writeVarInt(int64(len(s)))
e.WriteString(s)
_, _ = e.WriteString(s)
}
func (e *encoder) writeCompactString(s string) {
e.writeUnsignedVarInt(uint64(len(s)) + 1)
e.WriteString(s)
_, _ = e.WriteString(s)
}
func (e *encoder) writeNullString(s string) {
@@ -266,16 +240,7 @@ func (e *encoder) writeNullString(s string) {
e.writeInt16(-1)
} else {
e.writeInt16(int16(len(s)))
e.WriteString(s)
}
}
func (e *encoder) writeVarNullString(s string) {
if s == "" {
e.writeVarInt(-1)
} else {
e.writeVarInt(int64(len(s)))
e.WriteString(s)
_, _ = e.WriteString(s)
}
}
@@ -284,23 +249,18 @@ func (e *encoder) writeCompactNullString(s string) {
e.writeUnsignedVarInt(0)
} else {
e.writeUnsignedVarInt(uint64(len(s)) + 1)
e.WriteString(s)
_, _ = e.WriteString(s)
}
}
func (e *encoder) writeBytes(b []byte) {
e.writeInt32(int32(len(b)))
e.Write(b)
}
func (e *encoder) writeVarBytes(b []byte) {
e.writeVarInt(int64(len(b)))
e.Write(b)
_, _ = e.Write(b)
}
func (e *encoder) writeCompactBytes(b []byte) {
e.writeUnsignedVarInt(uint64(len(b)) + 1)
e.Write(b)
_, _ = e.Write(b)
}
func (e *encoder) writeNullBytes(b []byte) {
@@ -308,16 +268,7 @@ func (e *encoder) writeNullBytes(b []byte) {
e.writeInt32(-1)
} else {
e.writeInt32(int32(len(b)))
e.Write(b)
}
}
func (e *encoder) writeVarNullBytes(b []byte) {
if b == nil {
e.writeVarInt(-1)
} else {
e.writeVarInt(int64(len(b)))
e.Write(b)
_, _ = e.Write(b)
}
}
@@ -326,69 +277,10 @@ func (e *encoder) writeCompactNullBytes(b []byte) {
e.writeUnsignedVarInt(0)
} else {
e.writeUnsignedVarInt(uint64(len(b)) + 1)
e.Write(b)
_, _ = e.Write(b)
}
}
func (e *encoder) writeBytesFrom(b Bytes) error {
size := int64(b.Len())
e.writeInt32(int32(size))
n, err := io.Copy(e, b)
if err == nil && n != size {
err = fmt.Errorf("size of bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
}
return err
}
func (e *encoder) writeNullBytesFrom(b Bytes) error {
if b == nil {
e.writeInt32(-1)
return nil
} else {
size := int64(b.Len())
e.writeInt32(int32(size))
n, err := io.Copy(e, b)
if err == nil && n != size {
err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
}
return err
}
}
func (e *encoder) writeVarNullBytesFrom(b Bytes) error {
if b == nil {
e.writeVarInt(-1)
return nil
} else {
size := int64(b.Len())
e.writeVarInt(size)
n, err := io.Copy(e, b)
if err == nil && n != size {
err = fmt.Errorf("size of nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
}
return err
}
}
func (e *encoder) writeCompactNullBytesFrom(b Bytes) error {
if b == nil {
e.writeUnsignedVarInt(0)
return nil
} else {
size := int64(b.Len())
e.writeUnsignedVarInt(uint64(size + 1))
n, err := io.Copy(e, b)
if err == nil && n != size {
err = fmt.Errorf("size of compact nullable bytes does not match the number of bytes that were written (size=%d, written=%d): %w", size, n, io.ErrUnexpectedEOF)
}
return err
}
}
func (e *encoder) writeVarInt(i int64) {
e.writeUnsignedVarInt(uint64((i << 1) ^ (i >> 63)))
}
func (e *encoder) writeUnsignedVarInt(i uint64) {
b := e.buffer[:]
n := 0
@@ -404,7 +296,7 @@ func (e *encoder) writeUnsignedVarInt(i uint64) {
n++
}
e.Write(b[:n])
_, _ = e.Write(b[:n])
}
type encodeFunc func(*encoder, value)
@@ -530,7 +422,7 @@ func structEncodeFuncOf(typ reflect.Type, version int16, flexible bool) encodeFu
se := &encoder{writer: buf}
f.encode(se, v.fieldByIndex(f.index))
e.writeUnsignedVarInt(uint64(buf.Len()))
e.Write(buf.Bytes())
_, _ = e.Write(buf.Bytes())
}
}
}

View File

@@ -346,7 +346,13 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
})
obj, err := oj.ParseString(string(partitionsJson))
if err != nil {
return rep
}
recordBatchPath, err := jp.ParseString(`partitionData.records.recordBatch`)
if err != nil {
return rep
}
recordBatchresults := recordBatchPath.Get(obj)
if len(recordBatchresults) > 0 {
rep = append(rep, api.SectionData{
@@ -357,6 +363,9 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
}
recordsPath, err := jp.ParseString(`partitionData.records.recordBatch.record`)
if err != nil {
return rep
}
recordsResults := recordsPath.Get(obj)
if len(recordsResults) > 0 {
records := recordsResults[0].([]interface{})

View File

@@ -81,10 +81,8 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ApiVersions:
summary = reqDetails["clientID"].(string)
break
case Produce:
_topics := reqDetails["payload"].(map[string]interface{})["topicData"]
if _topics == nil {
@@ -97,7 +95,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case Fetch:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
@@ -110,7 +107,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case ListOffsets:
_topics := reqDetails["payload"].(map[string]interface{})["topics"]
if _topics == nil {
@@ -123,7 +119,6 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case CreateTopics:
topics := reqDetails["payload"].(map[string]interface{})["topics"].([]interface{})
for _, topic := range topics {
@@ -132,13 +127,11 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
if len(summary) > 0 {
summary = summary[:len(summary)-2]
}
break
case DeleteTopics:
topicNames := reqDetails["topicNames"].([]string)
for _, name := range topicNames {
summary += fmt.Sprintf("%s, ", name)
}
break
}
request["url"] = summary
@@ -173,7 +166,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
representation := make(map[string]interface{})
apiKey := ApiKey(request["apiKey"].(float64))
@@ -183,31 +176,24 @@ func (d dissecting) Represent(request map[string]interface{}, response map[strin
case Metadata:
repRequest = representMetadataRequest(request)
repResponse = representMetadataResponse(response)
break
case ApiVersions:
repRequest = representApiVersionsRequest(request)
repResponse = representApiVersionsResponse(response)
break
case Produce:
repRequest = representProduceRequest(request)
repResponse = representProduceResponse(response)
break
case Fetch:
repRequest = representFetchRequest(request)
repResponse = representFetchResponse(response)
break
case ListOffsets:
repRequest = representListOffsetsRequest(request)
repResponse = representListOffsetsResponse(response)
break
case CreateTopics:
repRequest = representCreateTopicsRequest(request)
repResponse = representCreateTopicsResponse(response)
break
case DeleteTopics:
repRequest = representDeleteTopicsRequest(request)
repResponse = representDeleteTopicsResponse(response)
break
}
representation["request"] = repRequest

View File

@@ -26,9 +26,9 @@ func CreateResponseRequestMatcher() requestResponseMatcher {
func (matcher *requestResponseMatcher) registerRequest(key string, request *Request) *RequestResponsePair {
if response, found := matcher.openMessagesMap.LoadAndDelete(key); found {
// Check for a situation that only occurs when a Kafka broker is initiating
switch response.(type) {
switch v := response.(type) {
case *Response:
return matcher.preparePair(request, response.(*Response))
return matcher.preparePair(request, v)
}
}

View File

@@ -167,10 +167,6 @@ type messageType struct {
encode encodeFunc
}
func (t *messageType) new() Message {
return reflect.New(t.gotype).Interface().(Message)
}
type apiType struct {
requests []messageType
responses []messageType
@@ -389,16 +385,16 @@ func (b Broker) String() string {
func (b Broker) Format(w fmt.State, v rune) {
switch v {
case 'd':
io.WriteString(w, itoa(b.ID))
_, _ = io.WriteString(w, itoa(b.ID))
case 's':
io.WriteString(w, b.String())
_, _ = io.WriteString(w, b.String())
case 'v':
io.WriteString(w, itoa(b.ID))
io.WriteString(w, " ")
io.WriteString(w, b.String())
_, _ = io.WriteString(w, itoa(b.ID))
_, _ = io.WriteString(w, " ")
_, _ = io.WriteString(w, b.String())
if b.Rack != "" {
io.WriteString(w, " ")
io.WriteString(w, b.Rack)
_, _ = io.WriteString(w, " ")
_, _ = io.WriteString(w, b.Rack)
}
}
}

View File

@@ -99,10 +99,8 @@ func (k apiKey) String() string {
return strconv.Itoa(int(k))
}
type apiVersion int16
const (
v0 = 0
// v0 = 0
v1 = 1
v2 = 2
v3 = 3
@@ -113,6 +111,7 @@ const (
v8 = 8
v9 = 9
v10 = 10
v11 = 11
)
var apiKeyStrings = [...]string{
@@ -166,35 +165,6 @@ var apiKeyStrings = [...]string{
offsetDelete: "OffsetDelete",
}
type requestHeader struct {
Size int32
ApiKey int16
ApiVersion int16
CorrelationID int32
ClientID string
}
func sizeofString(s string) int32 {
return 2 + int32(len(s))
}
func (h requestHeader) size() int32 {
return 4 + 2 + 2 + 4 + sizeofString(h.ClientID)
}
// func (h requestHeader) writeTo(wb *writeBuffer) {
// wb.writeInt32(h.Size)
// wb.writeInt16(h.ApiKey)
// wb.writeInt16(h.ApiVersion)
// wb.writeInt32(h.CorrelationID)
// wb.writeString(h.ClientID)
// }
type request interface {
size() int32
// writable
}
func makeInt8(b []byte) int8 {
return int8(b[0])
}
@@ -210,10 +180,3 @@ func makeInt32(b []byte) int32 {
func makeInt64(b []byte) int64 {
return int64(binary.BigEndian.Uint64(b))
}
func expectZeroSize(sz int, err error) error {
if err == nil && sz != 0 {
err = fmt.Errorf("reading a response left %d unread bytes", sz)
}
return err
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"reflect"
)
type readable interface {
@@ -42,59 +41,6 @@ func readInt64(r *bufio.Reader, sz int, v *int64) (int, error) {
return peekRead(r, sz, 8, func(b []byte) { *v = makeInt64(b) })
}
func readVarInt(r *bufio.Reader, sz int, v *int64) (remain int, err error) {
// Optimistically assume that most of the time, there will be data buffered
// in the reader. If this is not the case, the buffer will be refilled after
// consuming zero bytes from the input.
input, _ := r.Peek(r.Buffered())
x := uint64(0)
s := uint(0)
for {
if len(input) > sz {
input = input[:sz]
}
for i, b := range input {
if b < 0x80 {
x |= uint64(b) << s
*v = int64(x>>1) ^ -(int64(x) & 1)
n, err := r.Discard(i + 1)
return sz - n, err
}
x |= uint64(b&0x7f) << s
s += 7
}
// Make room in the input buffer to load more data from the underlying
// stream. The x and s variables are left untouched, ensuring that the
// varint decoding can continue on the next loop iteration.
n, _ := r.Discard(len(input))
sz -= n
if sz == 0 {
return 0, errShortRead
}
// Fill the buffer: ask for one more byte, but in practice the reader
// will load way more from the underlying stream.
if _, err := r.Peek(1); err != nil {
if err == io.EOF {
err = errShortRead
}
return sz, err
}
// Grab as many bytes as possible from the buffer, then go on to the
// next loop iteration which is going to consume it.
input, _ = r.Peek(r.Buffered())
}
}
func readBool(r *bufio.Reader, sz int, v *bool) (int, error) {
return peekRead(r, sz, 1, func(b []byte) { *v = b[0] != 0 })
}
func readString(r *bufio.Reader, sz int, v *string) (int, error) {
return readStringWith(r, sz, func(r *bufio.Reader, sz int, n int) (remain int, err error) {
*v, remain, err = readNewString(r, sz, n)
@@ -179,102 +125,6 @@ func readArrayLen(r *bufio.Reader, sz int, n *int) (int, error) {
return sz, nil
}
func readArrayWith(r *bufio.Reader, sz int, cb func(*bufio.Reader, int) (int, error)) (int, error) {
var err error
var len int32
if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}
for n := int(len); n > 0; n-- {
if sz, err = cb(r, sz); err != nil {
break
}
}
return sz, err
}
func readStringArray(r *bufio.Reader, sz int, v *[]string) (remain int, err error) {
var content []string
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var value string
if fnRemain, fnErr = readString(r, size, &value); fnErr != nil {
return
}
content = append(content, value)
return
}
if remain, err = readArrayWith(r, sz, fn); err != nil {
return
}
*v = content
return
}
func readMapStringInt32(r *bufio.Reader, sz int, v *map[string][]int32) (remain int, err error) {
var len int32
if remain, err = readInt32(r, sz, &len); err != nil {
return
}
content := make(map[string][]int32, len)
for i := 0; i < int(len); i++ {
var key string
var values []int32
if remain, err = readString(r, remain, &key); err != nil {
return
}
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
var value int32
if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
return
}
values = append(values, value)
return
}
if remain, err = readArrayWith(r, remain, fn); err != nil {
return
}
content[key] = values
}
*v = content
return
}
func read(r *bufio.Reader, sz int, a interface{}) (int, error) {
switch v := a.(type) {
case *int8:
return readInt8(r, sz, v)
case *int16:
return readInt16(r, sz, v)
case *int32:
return readInt32(r, sz, v)
case *int64:
return readInt64(r, sz, v)
case *bool:
return readBool(r, sz, v)
case *string:
return readString(r, sz, v)
case *[]byte:
return readBytes(r, sz, v)
}
switch v := reflect.ValueOf(a).Elem(); v.Kind() {
case reflect.Struct:
return readStruct(r, sz, v)
case reflect.Slice:
return readSlice(r, sz, v)
default:
panic(fmt.Sprintf("unsupported type: %T", a))
}
}
func ReadAll(r *bufio.Reader, sz int, ptrs ...interface{}) (int, error) {
var err error
@@ -307,333 +157,3 @@ func readPtr(r *bufio.Reader, sz int, ptr interface{}) (int, error) {
panic(fmt.Sprintf("unsupported type: %T", v))
}
}
func readStruct(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var err error
for i, n := 0, v.NumField(); i != n; i++ {
if sz, err = read(r, sz, v.Field(i).Addr().Interface()); err != nil {
return sz, err
}
}
return sz, nil
}
func readSlice(r *bufio.Reader, sz int, v reflect.Value) (int, error) {
var err error
var len int32
if sz, err = readInt32(r, sz, &len); err != nil {
return sz, err
}
if n := int(len); n < 0 {
v.Set(reflect.Zero(v.Type()))
} else {
v.Set(reflect.MakeSlice(v.Type(), n, n))
for i := 0; i != n; i++ {
if sz, err = read(r, sz, v.Index(i).Addr().Interface()); err != nil {
return sz, err
}
}
}
return sz, nil
}
func readFetchResponseHeaderV2(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
var n int32
var p struct {
Partition int32
ErrorCode int16
HighwaterMarkOffset int64
MessageSetSize int32
}
if remain, err = readInt32(r, size, &throttle); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
return
}
// We ignore the topic name because we've requests messages for a single
// topic, unless there's a bug in the kafka server we will have received
// the name of the topic that we requested.
if remain, err = discardString(r, remain); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
return
}
if remain, err = read(r, remain, &p); err != nil {
return
}
if p.ErrorCode != 0 {
err = Error(p.ErrorCode)
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if remain != int(p.MessageSetSize) {
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", p.MessageSetSize, remain)
return
}
watermark = p.HighwaterMarkOffset
return
}
func readFetchResponseHeaderV5(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
var n int32
type AbortedTransaction struct {
ProducerId int64
FirstOffset int64
}
var p struct {
Partition int32
ErrorCode int16
HighwaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
}
var messageSetSize int32
var abortedTransactions []AbortedTransaction
if remain, err = readInt32(r, size, &throttle); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
return
}
// We ignore the topic name because we've requests messages for a single
// topic, unless there's a bug in the kafka server we will have received
// the name of the topic that we requested.
if remain, err = discardString(r, remain); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
return
}
if remain, err = read(r, remain, &p); err != nil {
return
}
var abortedTransactionLen int
if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
return
}
if abortedTransactionLen == -1 {
abortedTransactions = nil
} else {
abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
for i := 0; i < abortedTransactionLen; i++ {
if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
return
}
}
}
if p.ErrorCode != 0 {
err = Error(p.ErrorCode)
return
}
remain, err = readInt32(r, remain, &messageSetSize)
if err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if remain != int(messageSetSize) {
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
return
}
watermark = p.HighwaterMarkOffset
return
}
func readFetchResponseHeaderV10(r *bufio.Reader, size int) (throttle int32, watermark int64, remain int, err error) {
var n int32
var errorCode int16
type AbortedTransaction struct {
ProducerId int64
FirstOffset int64
}
var p struct {
Partition int32
ErrorCode int16
HighwaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
}
var messageSetSize int32
var abortedTransactions []AbortedTransaction
if remain, err = readInt32(r, size, &throttle); err != nil {
return
}
if remain, err = readInt16(r, remain, &errorCode); err != nil {
return
}
if errorCode != 0 {
err = Error(errorCode)
return
}
if remain, err = discardInt32(r, remain); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka topic was expected in the fetch response but the client received %d", n)
return
}
// We ignore the topic name because we've requests messages for a single
// topic, unless there's a bug in the kafka server we will have received
// the name of the topic that we requested.
if remain, err = discardString(r, remain); err != nil {
return
}
if remain, err = readInt32(r, remain, &n); err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if n != 1 {
err = fmt.Errorf("1 kafka partition was expected in the fetch response but the client received %d", n)
return
}
if remain, err = read(r, remain, &p); err != nil {
return
}
var abortedTransactionLen int
if remain, err = readArrayLen(r, remain, &abortedTransactionLen); err != nil {
return
}
if abortedTransactionLen == -1 {
abortedTransactions = nil
} else {
abortedTransactions = make([]AbortedTransaction, abortedTransactionLen)
for i := 0; i < abortedTransactionLen; i++ {
if remain, err = read(r, remain, &abortedTransactions[i]); err != nil {
return
}
}
}
if p.ErrorCode != 0 {
err = Error(p.ErrorCode)
return
}
remain, err = readInt32(r, remain, &messageSetSize)
if err != nil {
return
}
// This error should never trigger, unless there's a bug in the kafka client
// or server.
if remain != int(messageSetSize) {
err = fmt.Errorf("the size of the message set in a fetch response doesn't match the number of remaining bytes (message set size = %d, remaining bytes = %d)", messageSetSize, remain)
return
}
watermark = p.HighwaterMarkOffset
return
}
func readMessageHeader(r *bufio.Reader, sz int) (offset int64, attributes int8, timestamp int64, remain int, err error) {
var version int8
if remain, err = readInt64(r, sz, &offset); err != nil {
return
}
// On discarding the message size and CRC:
// ---------------------------------------
//
// - Not sure why kafka gives the message size here, we already have the
// number of remaining bytes in the response and kafka should only truncate
// the trailing message.
//
// - TCP is already taking care of ensuring data integrity, no need to
// waste resources doing it a second time so we just skip the message CRC.
//
if remain, err = discardN(r, remain, 8); err != nil {
return
}
if remain, err = readInt8(r, remain, &version); err != nil {
return
}
if remain, err = readInt8(r, remain, &attributes); err != nil {
return
}
switch version {
case 0:
case 1:
remain, err = readInt64(r, remain, &timestamp)
default:
err = fmt.Errorf("unsupported message version %d found in fetch response", version)
}
return
}

View File

@@ -110,25 +110,6 @@ type RecordSet struct {
Records RecordReader
}
// bufferedReader is an interface implemented by types like bufio.Reader, which
// we use to optimize prefix reads by accessing the internal buffer directly
// through calls to Peek.
type bufferedReader interface {
Discard(int) (int, error)
Peek(int) ([]byte, error)
}
// bytesBuffer is an interface implemented by types like bytes.Buffer, which we
// use to optimize prefix reads by accessing the internal buffer directly
// through calls to Bytes.
type bytesBuffer interface {
Bytes() []byte
}
// magicByteOffset is the position of the magic byte in all versions of record
// sets in the kafka protocol.
const magicByteOffset = 16
// ReadFrom reads the representation of a record set from r into rs, returning
// the number of bytes consumed from r, and an non-nil error if the record set
// could not be read.
@@ -292,23 +273,7 @@ func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) {
return 0, nil
}
func makeTime(t int64) time.Time {
return time.Unix(t/1000, (t%1000)*int64(time.Millisecond))
}
func timestamp(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.UnixNano() / int64(time.Millisecond)
}
func packUint32(u uint32) (b [4]byte) {
binary.BigEndian.PutUint32(b[:], u)
return
}
func packUint64(u uint64) (b [8]byte) {
binary.BigEndian.PutUint64(b[:], u)
return
}

View File

@@ -34,10 +34,6 @@ func valueOf(x interface{}) value {
return value{val: reflect.ValueOf(x).Elem()}
}
func makeValue(t reflect.Type) value {
return value{val: reflect.New(t).Elem()}
}
func (v value) bool() bool { return v.val.Bool() }
func (v value) int8() int8 { return int8(v.int64()) }
@@ -54,7 +50,7 @@ func (v value) bytes() []byte { return v.val.Bytes() }
func (v value) iface(t reflect.Type) interface{} { return v.val.Addr().Interface() }
func (v value) array(t reflect.Type) array { return array{val: v.val} }
func (v value) array(t reflect.Type) array { return array{val: v.val} } //nolint
func (v value) setBool(b bool) { v.val.SetBool(b) }

View File

@@ -64,19 +64,19 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
case Metadata:
var mt interface{}
var metadataRequest interface{}
if apiVersion >= 11 {
if apiVersion >= v11 {
types := makeTypes(reflect.TypeOf(&MetadataRequestV11{}).Elem())
mt = types[0]
metadataRequest = &MetadataRequestV11{}
} else if apiVersion >= 10 {
} else if apiVersion >= v10 {
types := makeTypes(reflect.TypeOf(&MetadataRequestV10{}).Elem())
mt = types[0]
metadataRequest = &MetadataRequestV10{}
} else if apiVersion >= 8 {
} else if apiVersion >= v8 {
types := makeTypes(reflect.TypeOf(&MetadataRequestV8{}).Elem())
mt = types[0]
metadataRequest = &MetadataRequestV8{}
} else if apiVersion >= 4 {
} else if apiVersion >= v4 {
types := makeTypes(reflect.TypeOf(&MetadataRequestV4{}).Elem())
mt = types[0]
metadataRequest = &MetadataRequestV4{}
@@ -87,11 +87,10 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(metadataRequest))
payload = metadataRequest
break
case ApiVersions:
var mt interface{}
var apiVersionsRequest interface{}
if apiVersion >= 3 {
if apiVersion >= v3 {
types := makeTypes(reflect.TypeOf(&ApiVersionsRequestV3{}).Elem())
mt = types[0]
apiVersionsRequest = &ApiVersionsRequestV3{}
@@ -102,11 +101,10 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(apiVersionsRequest))
payload = apiVersionsRequest
break
case Produce:
var mt interface{}
var produceRequest interface{}
if apiVersion >= 3 {
if apiVersion >= v3 {
types := makeTypes(reflect.TypeOf(&ProduceRequestV3{}).Elem())
mt = types[0]
produceRequest = &ProduceRequestV3{}
@@ -117,7 +115,6 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(produceRequest))
payload = produceRequest
break
case Fetch:
var mt interface{}
var fetchRequest interface{}
@@ -125,23 +122,23 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
types := makeTypes(reflect.TypeOf(&FetchRequestV11{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV11{}
} else if apiVersion >= 9 {
} else if apiVersion >= v9 {
types := makeTypes(reflect.TypeOf(&FetchRequestV9{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV9{}
} else if apiVersion >= 7 {
} else if apiVersion >= v7 {
types := makeTypes(reflect.TypeOf(&FetchRequestV7{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV7{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&FetchRequestV5{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV5{}
} else if apiVersion >= 4 {
} else if apiVersion >= v4 {
types := makeTypes(reflect.TypeOf(&FetchRequestV4{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV4{}
} else if apiVersion >= 3 {
} else if apiVersion >= v3 {
types := makeTypes(reflect.TypeOf(&FetchRequestV3{}).Elem())
mt = types[0]
fetchRequest = &FetchRequestV3{}
@@ -152,19 +149,18 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(fetchRequest))
payload = fetchRequest
break
case ListOffsets:
var mt interface{}
var listOffsetsRequest interface{}
if apiVersion >= 4 {
if apiVersion >= v4 {
types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV4{}).Elem())
mt = types[0]
listOffsetsRequest = &ListOffsetsRequestV4{}
} else if apiVersion >= 2 {
} else if apiVersion >= v2 {
types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV2{}).Elem())
mt = types[0]
listOffsetsRequest = &ListOffsetsRequestV2{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&ListOffsetsRequestV1{}).Elem())
mt = types[0]
listOffsetsRequest = &ListOffsetsRequestV1{}
@@ -175,11 +171,10 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(listOffsetsRequest))
payload = listOffsetsRequest
break
case CreateTopics:
var mt interface{}
var createTopicsRequest interface{}
if apiVersion >= 1 {
if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&CreateTopicsRequestV1{}).Elem())
mt = types[0]
createTopicsRequest = &CreateTopicsRequestV1{}
@@ -190,11 +185,10 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (api
}
mt.(messageType).decode(d, valueOf(createTopicsRequest))
payload = createTopicsRequest
break
case DeleteTopics:
var mt interface{}
var deleteTopicsRequest interface{}
if apiVersion >= 6 {
if apiVersion >= v6 {
types := makeTypes(reflect.TypeOf(&DeleteTopicsRequestV6{}).Elem())
mt = types[0]
deleteTopicsRequest = &DeleteTopicsRequestV6{}
@@ -285,7 +279,7 @@ func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID s
if err == nil {
size := packUint32(uint32(b.Size()) - 4)
b.WriteAt(size[:], 0)
_, _ = b.WriteAt(size[:], 0)
_, err = b.WriteTo(w)
}

View File

@@ -62,35 +62,35 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
case Metadata:
var mt interface{}
var metadataResponse interface{}
if apiVersion >= 11 {
if apiVersion >= v11 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV11{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV11{}
} else if apiVersion >= 10 {
} else if apiVersion >= v10 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV10{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV10{}
} else if apiVersion >= 8 {
} else if apiVersion >= v8 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV8{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV8{}
} else if apiVersion >= 7 {
} else if apiVersion >= v7 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV7{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV7{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV5{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV5{}
} else if apiVersion >= 3 {
} else if apiVersion >= v3 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV3{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV3{}
} else if apiVersion >= 2 {
} else if apiVersion >= v2 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV2{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV2{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&MetadataResponseV1{}).Elem())
mt = types[0]
metadataResponse = &MetadataResponseV1{}
@@ -101,11 +101,10 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
mt.(messageType).decode(d, valueOf(metadataResponse))
reqResPair.Response.Payload = metadataResponse
break
case ApiVersions:
var mt interface{}
var apiVersionsResponse interface{}
if apiVersion >= 1 {
if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&ApiVersionsResponseV1{}).Elem())
mt = types[0]
apiVersionsResponse = &ApiVersionsResponseV1{}
@@ -116,23 +115,22 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
mt.(messageType).decode(d, valueOf(apiVersionsResponse))
reqResPair.Response.Payload = apiVersionsResponse
break
case Produce:
var mt interface{}
var produceResponse interface{}
if apiVersion >= 8 {
if apiVersion >= v8 {
types := makeTypes(reflect.TypeOf(&ProduceResponseV8{}).Elem())
mt = types[0]
produceResponse = &ProduceResponseV8{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&ProduceResponseV5{}).Elem())
mt = types[0]
produceResponse = &ProduceResponseV5{}
} else if apiVersion >= 2 {
} else if apiVersion >= v2 {
types := makeTypes(reflect.TypeOf(&ProduceResponseV2{}).Elem())
mt = types[0]
produceResponse = &ProduceResponseV2{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&ProduceResponseV1{}).Elem())
mt = types[0]
produceResponse = &ProduceResponseV1{}
@@ -143,27 +141,26 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
mt.(messageType).decode(d, valueOf(produceResponse))
reqResPair.Response.Payload = produceResponse
break
case Fetch:
var mt interface{}
var fetchResponse interface{}
if apiVersion >= 11 {
if apiVersion >= v11 {
types := makeTypes(reflect.TypeOf(&FetchResponseV11{}).Elem())
mt = types[0]
fetchResponse = &FetchResponseV11{}
} else if apiVersion >= 7 {
} else if apiVersion >= v7 {
types := makeTypes(reflect.TypeOf(&FetchResponseV7{}).Elem())
mt = types[0]
fetchResponse = &FetchResponseV7{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&FetchResponseV5{}).Elem())
mt = types[0]
fetchResponse = &FetchResponseV5{}
} else if apiVersion >= 4 {
} else if apiVersion >= v4 {
types := makeTypes(reflect.TypeOf(&FetchResponseV4{}).Elem())
mt = types[0]
fetchResponse = &FetchResponseV4{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&FetchResponseV1{}).Elem())
mt = types[0]
fetchResponse = &FetchResponseV1{}
@@ -174,19 +171,18 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
mt.(messageType).decode(d, valueOf(fetchResponse))
reqResPair.Response.Payload = fetchResponse
break
case ListOffsets:
var mt interface{}
var listOffsetsResponse interface{}
if apiVersion >= 4 {
if apiVersion >= v4 {
types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV4{}).Elem())
mt = types[0]
listOffsetsResponse = &ListOffsetsResponseV4{}
} else if apiVersion >= 2 {
} else if apiVersion >= v2 {
types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV2{}).Elem())
mt = types[0]
listOffsetsResponse = &ListOffsetsResponseV2{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&ListOffsetsResponseV1{}).Elem())
mt = types[0]
listOffsetsResponse = &ListOffsetsResponseV1{}
@@ -200,19 +196,19 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
case CreateTopics:
var mt interface{}
var createTopicsResponse interface{}
if apiVersion >= 7 {
if apiVersion >= v7 {
types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV0{}).Elem())
mt = types[0]
createTopicsResponse = &CreateTopicsResponseV0{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV5{}).Elem())
mt = types[0]
createTopicsResponse = &CreateTopicsResponseV5{}
} else if apiVersion >= 2 {
} else if apiVersion >= v2 {
types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV2{}).Elem())
mt = types[0]
createTopicsResponse = &CreateTopicsResponseV2{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&CreateTopicsResponseV1{}).Elem())
mt = types[0]
createTopicsResponse = &CreateTopicsResponseV1{}
@@ -223,19 +219,18 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emi
}
mt.(messageType).decode(d, valueOf(createTopicsResponse))
reqResPair.Response.Payload = createTopicsResponse
break
case DeleteTopics:
var mt interface{}
var deleteTopicsResponse interface{}
if apiVersion >= 6 {
if apiVersion >= v6 {
types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV6{}).Elem())
mt = types[0]
deleteTopicsResponse = &DeleteTopicsReponseV6{}
} else if apiVersion >= 5 {
} else if apiVersion >= v5 {
types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV5{}).Elem())
mt = types[0]
deleteTopicsResponse = &DeleteTopicsReponseV5{}
} else if apiVersion >= 1 {
} else if apiVersion >= v1 {
types := makeTypes(reflect.TypeOf(&DeleteTopicsReponseV1{}).Elem())
mt = types[0]
deleteTopicsResponse = &DeleteTopicsReponseV1{}
@@ -337,7 +332,7 @@ func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Messa
if err == nil {
size := packUint32(uint32(b.Size()) - 4)
b.WriteAt(size[:], 0)
_, _ = b.WriteAt(size[:], 0)
_, err = b.WriteTo(w)
}

View File

@@ -296,8 +296,6 @@ type MessageV0 struct {
Set *MessageSet `json:"set"` // the message set a message might wrap
Version int8 `json:"version"` // v1 requires Kafka 0.10
Timestamp time.Time `json:"timestamp"` // the timestamp of the message (version 1+ only)
compressedSize int // used for computing the compression ratio metrics
}
// MessageBlock represents a part of request with message

View File

@@ -52,9 +52,13 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
if isClient {
handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleClientStream(tcpID, counterPair, superTimer, emitter, redisPacket)
} else {
handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
err = handleServerStream(tcpID, counterPair, superTimer, emitter, redisPacket)
}
if err != nil {
return err
}
}
}
@@ -108,7 +112,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
func (d dissecting) Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) {
bodySize = 0
representation := make(map[string]interface{}, 0)
representation := make(map[string]interface{})
repRequest := representGeneric(request, `request.`)
repResponse := representGeneric(response, `response.`)
representation["request"] = repRequest

View File

@@ -4,11 +4,9 @@ import (
"bufio"
"errors"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"time"
)
const (
@@ -18,90 +16,12 @@ const (
busyPrefix = "BUSY "
noscriptPrefix = "NOSCRIPT "
defaultHost = "localhost"
defaultPort = 6379
defaultSentinelPort = 26379
defaultTimeout = 5 * time.Second
defaultDatabase = 2 * time.Second
dollarByte = '$'
asteriskByte = '*'
plusByte = '+'
minusByte = '-'
colonByte = ':'
notApplicableByte = '0'
sentinelMasters = "masters"
sentinelGetMasterAddrByName = "get-master-addr-by-name"
sentinelReset = "reset"
sentinelSlaves = "slaves"
sentinelFailOver = "failover"
sentinelMonitor = "monitor"
sentinelRemove = "remove"
sentinelSet = "set"
clusterNodes = "nodes"
clusterMeet = "meet"
clusterReset = "reset"
clusterAddSlots = "addslots"
clusterDelSlots = "delslots"
clusterInfo = "info"
clusterGetKeysInSlot = "getkeysinslot"
clusterSetSlot = "setslot"
clusterSetSlotNode = "node"
clusterSetSlotMigrating = "migrating"
clusterSetSlotImporting = "importing"
clusterSetSlotStable = "stable"
clusterForget = "forget"
clusterFlushSlot = "flushslots"
clusterKeySlot = "keyslot"
clusterCountKeyInSlot = "countkeysinslot"
clusterSaveConfig = "saveconfig"
clusterReplicate = "replicate"
clusterSlaves = "slaves"
clusterFailOver = "failover"
clusterSlots = "slots"
pubSubChannels = "channels"
pubSubNumSub = "numsub"
pubSubNumPat = "numpat"
)
//intToByteArr convert int to byte array
func intToByteArr(a int) []byte {
buf := make([]byte, 0)
return strconv.AppendInt(buf, int64(a), 10)
}
var (
bytesTrue = intToByteArr(1)
bytesFalse = intToByteArr(0)
bytesTilde = []byte("~")
positiveInfinityBytes = []byte("+inf")
negativeInfinityBytes = []byte("-inf")
)
var (
sizeTable = []int{9, 99, 999, 9999, 99999, 999999, 9999999, 99999999,
999999999, math.MaxInt32}
digitTens = []byte{'0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1',
'1', '1', '1', '1', '1', '1', '1', '1', '1', '2', '2', '2', '2', '2', '2', '2', '2', '2',
'2', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '4', '4', '4', '4', '4', '4', '4',
'4', '4', '4', '5', '5', '5', '5', '5', '5', '5', '5', '5', '5', '6', '6', '6', '6', '6',
'6', '6', '6', '6', '6', '7', '7', '7', '7', '7', '7', '7', '7', '7', '7', '8', '8', '8',
'8', '8', '8', '8', '8', '8', '8', '9', '9', '9', '9', '9', '9', '9', '9', '9', '9'}
digitOnes = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0',
'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8',
'9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6',
'7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4',
'5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2',
'3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
digits = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a',
'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's',
't', 'u', 'v', 'w', 'x', 'y', 'z'}
)
// receive message from redis
@@ -197,12 +117,6 @@ func (r *RedisInputStream) readLineBytes() ([]byte, error) {
line := make([]byte, N)
j := 0
for i := r.count; i <= N; i++ {
if i >= len(buf) {
return nil, errors.New("Redis buffer index mismatch.")
}
if i >= len(line) {
return nil, errors.New("Redis line index mismatch.")
}
line[j] = buf[i]
j++
}
@@ -298,9 +212,9 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
packet = &RedisPacket{}
packet.Type = r
switch x.(type) {
switch v := x.(type) {
case []interface{}:
array := x.([]interface{})
array := v
if len(array) > 0 {
switch array[0].(type) {
case []uint8:
@@ -324,11 +238,11 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
if len(array) > 3 {
packet.Value = fmt.Sprintf("[%s", packet.Value)
for _, item := range array[3:] {
switch item.(type) {
switch j := item.(type) {
case []uint8:
packet.Value = fmt.Sprintf("%s, %s", packet.Value, item.([]uint8))
packet.Value = fmt.Sprintf("%s, %s", packet.Value, j)
case int64:
packet.Value = fmt.Sprintf("%s, %d", packet.Value, item.(int64))
packet.Value = fmt.Sprintf("%s, %d", packet.Value, j)
}
}
packet.Value = strings.TrimSuffix(packet.Value, ", ")
@@ -341,7 +255,7 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
}
}
case []uint8:
val := string(x.([]uint8))
val := string(v)
if packet.Type == types[plusByte] {
packet.Keyword = RedisKeyword(strings.ToUpper(val))
if !isValidRedisKeyword(keywords, packet.Keyword) {
@@ -352,9 +266,9 @@ func (p *RedisProtocol) Read() (packet *RedisPacket, err error) {
packet.Value = val
}
case string:
packet.Value = x.(string)
packet.Value = v
case int64:
packet.Value = fmt.Sprintf("%d", x.(int64))
packet.Value = fmt.Sprintf("%d", v)
default:
msg := fmt.Sprintf("Unrecognized Redis data type: %v", reflect.TypeOf(x))
err = errors.New(msg)
@@ -493,7 +407,7 @@ func (p *RedisProtocol) processError() (interface{}, error) {
func (p *RedisProtocol) parseTargetHostAndSlot(clusterRedirectResponse string) (host string, po int, slot int, err error) {
arr := strings.Split(clusterRedirectResponse, " ")
host, port := p.extractParts(arr[2])
slot, err = strconv.Atoi(arr[1])
slot, _ = strconv.Atoi(arr[1])
po, err = strconv.Atoi(port)
return
}

View File

@@ -96,6 +96,9 @@ func (h *tcpReader) run(wg *sync.WaitGroup) {
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter, filteringOptions)
if err != nil {
io.Copy(ioutil.Discard, b) //nolint
_, err = io.Copy(ioutil.Discard, b)
if err != nil {
logger.Log.Errorf("%v", err)
}
}
}

View File

@@ -47,6 +47,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
>
<div
style={{opacity: 0.5}}
id="entryDetailedTitleBodySize"
>
{formatSize(bodySize)}
</div>
@@ -58,6 +59,7 @@ const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
>
<div
style={{opacity: 0.5}}
id="entryDetailedTitleElapsedTime"
>
{Math.round(elapsedTime)}ms
</div>

View File

@@ -329,7 +329,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus}) => {
/>
</div>
</div>
<div className={classes.details}>
<div className={classes.details} id="rightSideContainer">
{focusedEntryId && <EntryDetailed />}
</div>
</div>}