Compare commits

..

3 Commits

Author SHA1 Message Date
lirazyehezkel
bf8d5ed069 Support multiple workspaces (TRA-4365) (#945)
* support multiple workspaces

* reopen by websocket url dep

* open websocket only when websocketURL is changed

* upgrade common version

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
2022-03-29 09:56:59 +03:00
Nimrod Gilboa Markevich
1f6e539590 Add commit message and committer to acceptance tests slack alert (#946)
* Add commit message and committer username to slack alerts
* Use name instead of username
* Use name and email
2022-03-29 09:15:42 +03:00
David Levanon
590fa08c81 EBPF error handling 2022-03-28 14:19:06 +03:00
29 changed files with 435 additions and 126 deletions

View File

@@ -43,7 +43,7 @@ jobs:
with:
status: ${{ job.status }}
notification_title: 'Mizu {workflow} has {status_message}'
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit: <{commit_url}|{commit_sha}>'
message_format: '{emoji} *{workflow}* {status_message} during <{run_url}|run>, after commit <{commit_url}|{commit_sha} ${{ github.event.head_commit.message }}> ${{ github.event.head_commit.committer.name }} <${{ github.event.head_commit.committer.email }}>'
footer: 'Linked Repo <{repo_url}|{repo}>'
notify_when: 'failure'
env:

View File

@@ -183,7 +183,6 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
namespace = resolvedSourceObject.Namespace
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
@@ -195,11 +194,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
// Overwrite namespace (if it was set according to the source)
// Only overwrite if non-empty
if resolvedDestinationObject.Namespace != "" {
namespace = resolvedDestinationObject.Namespace
}
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination, namespace

View File

@@ -112,7 +112,7 @@ func UpdateTapTargets(newTapTargets []v1.Pod) {
tapTargets = newTapTargets
packetSourceManager.UpdatePods(tapTargets, !*nodefrag, mainPacketInputChan)
packetSourceManager.UpdatePods(tapTargets)
if tlsTapperInstance != nil {
if err := tlstapper.UpdateTapTargets(tlsTapperInstance, &tapTargets, *procfs); err != nil {
@@ -198,8 +198,12 @@ func initializePacketSources() error {
}
var err error
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan)
return err
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
return err
} else {
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
return nil
}
}
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) (*tcpStreamMap, *tcpAssembler) {
@@ -255,9 +259,10 @@ func startPassiveTapper(streamsMap *tcpStreamMap, assembler *tcpAssembler) {
func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChannelItem, options *api.TrafficFilteringOptions) *tlstapper.TlsTapper {
tls := tlstapper.TlsTapper{}
tlsPerfBufferSize := os.Getpagesize() * 100
chunksBufferSize := os.Getpagesize() * 100
logBufferSize := os.Getpagesize()
if err := tls.Init(tlsPerfBufferSize, *procfs, extension); err != nil {
if err := tls.Init(chunksBufferSize, logBufferSize, *procfs, extension); err != nil {
tlstapper.LogError(err)
return nil
}
@@ -281,6 +286,7 @@ func startTlsTapper(extension *api.Extension, outputItems chan *api.OutputChanne
OutputChannel: outputItems,
}
go tls.PollForLogging()
go tls.Poll(emitter, options)
return &tls

View File

@@ -24,7 +24,7 @@ type PacketSourceManager struct {
}
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
if err != nil {
return nil, err
@@ -43,7 +43,7 @@ func NewPacketSourceManager(procfs string, filename string, interfaceName string
behaviour: behaviour,
}
go hostSource.readPackets(ipdefrag, packets)
sourceManager.UpdatePods(pods)
return sourceManager, nil
}
@@ -64,16 +64,16 @@ func newHostPacketSource(filename string, interfaceName string,
return source, nil
}
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod) {
if m.config.mtls {
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets)
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour)
}
m.setBPFFilter(pods)
}
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
interfaceName string, behaviour TcpPacketSourceBehaviour) {
relevantPids := m.getRelevantPids(procfs, pods)
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
@@ -90,7 +90,6 @@ func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
if err == nil {
go source.readPackets(ipdefrag, packets)
m.sources[pid] = source
}
}
@@ -154,6 +153,12 @@ func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
}
}
func (m *PacketSourceManager) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
for _, src := range m.sources {
go src.readPackets(ipdefrag, packets)
}
}
func (m *PacketSourceManager) Close() {
for _, src := range m.sources {
src.close()

View File

@@ -7,8 +7,12 @@ Copyright (C) UP9 Inc.
#include "include/headers.h"
#include "include/util.h"
#include "include/maps.h"
#include "include/log.h"
#include "include/logger_messages.h"
#include "include/pids.h"
#define IPV4_ADDR_LEN (16)
struct accept_info {
__u64* sockaddr;
__u32* addrlen;
@@ -41,9 +45,7 @@ void sys_enter_accept4(struct sys_enter_accept4_ctx *ctx) {
long err = bpf_map_update_elem(&accept_syscall_context, &id, &info, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting accept info (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_ACCEPT_INFO, id, err, 0l);
}
}
@@ -70,6 +72,7 @@ void sys_exit_accept4(struct sys_exit_accept4_ctx *ctx) {
struct accept_info *infoPtr = bpf_map_lookup_elem(&accept_syscall_context, &id);
if (infoPtr == NULL) {
log_error(ctx, LOG_ERROR_GETTING_ACCEPT_INFO, id, 0l, 0l);
return;
}
@@ -79,15 +82,14 @@ void sys_exit_accept4(struct sys_exit_accept4_ctx *ctx) {
bpf_map_delete_elem(&accept_syscall_context, &id);
if (err != 0) {
char msg[] = "Error reading accept info from accept syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_ACCEPT_INFO, id, err, 0l);
return;
}
__u32 addrlen;
bpf_probe_read(&addrlen, sizeof(__u32), info.addrlen);
if (addrlen != 16) {
if (addrlen != IPV4_ADDR_LEN) {
// Currently only ipv4 is supported linux-src/include/linux/inet.h
return;
}
@@ -105,9 +107,7 @@ void sys_exit_accept4(struct sys_exit_accept4_ctx *ctx) {
err = bpf_map_update_elem(&file_descriptor_to_ipv4, &key, &fdinfo, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting fd to address mapping from accept (key: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), key, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_FD_MAPPING, id, err, ORIGIN_SYS_EXIT_ACCEPT4_CODE);
}
}
@@ -145,9 +145,7 @@ void sys_enter_connect(struct sys_enter_connect_ctx *ctx) {
long err = bpf_map_update_elem(&connect_syscall_info, &id, &info, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting connect info (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_CONNECT_INFO, id, err, 0l);
}
}
@@ -176,6 +174,7 @@ void sys_exit_connect(struct sys_exit_connect_ctx *ctx) {
struct connect_info *infoPtr = bpf_map_lookup_elem(&connect_syscall_info, &id);
if (infoPtr == NULL) {
log_error(ctx, LOG_ERROR_GETTING_CONNECT_INFO, id, 0l, 0l);
return;
}
@@ -185,12 +184,11 @@ void sys_exit_connect(struct sys_exit_connect_ctx *ctx) {
bpf_map_delete_elem(&connect_syscall_info, &id);
if (err != 0) {
char msg[] = "Error reading connect info from connect syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_CONNECT_INFO, id, err, 0l);
return;
}
if (info.addrlen != 16) {
if (info.addrlen != IPV4_ADDR_LEN) {
// Currently only ipv4 is supported linux-src/include/linux/inet.h
return;
}
@@ -208,8 +206,6 @@ void sys_exit_connect(struct sys_exit_connect_ctx *ctx) {
err = bpf_map_update_elem(&file_descriptor_to_ipv4, &key, &fdinfo, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting fd to address mapping from connect (key: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), key, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_FD_MAPPING, id, err, ORIGIN_SYS_EXIT_CONNECT_CODE);
}
}

View File

@@ -7,6 +7,8 @@ Copyright (C) UP9 Inc.
#include "include/headers.h"
#include "include/util.h"
#include "include/maps.h"
#include "include/log.h"
#include "include/logger_messages.h"
#include "include/pids.h"
struct sys_enter_read_ctx {
@@ -36,8 +38,7 @@ void sys_enter_read(struct sys_enter_read_ctx *ctx) {
long err = bpf_probe_read(&info, sizeof(struct ssl_info), infoPtr);
if (err != 0) {
char msg[] = "Error reading read info from read syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_SSL_CONTEXT, id, err, ORIGIN_SYS_ENTER_READ_CODE);
return;
}
@@ -46,9 +47,7 @@ void sys_enter_read(struct sys_enter_read_ctx *ctx) {
err = bpf_map_update_elem(&ssl_read_context, &id, &info, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting file descriptor from read syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_FILE_DESCRIPTOR, id, err, ORIGIN_SYS_ENTER_READ_CODE);
}
}
@@ -79,8 +78,7 @@ void sys_enter_write(struct sys_enter_write_ctx *ctx) {
long err = bpf_probe_read(&info, sizeof(struct ssl_info), infoPtr);
if (err != 0) {
char msg[] = "Error reading write context from write syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_SSL_CONTEXT, id, err, ORIGIN_SYS_ENTER_WRITE_CODE);
return;
}
@@ -89,8 +87,6 @@ void sys_enter_write(struct sys_enter_write_ctx *ctx) {
err = bpf_map_update_elem(&ssl_write_context, &id, &info, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting file descriptor from write syscall (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
return;
log_error(ctx, LOG_ERROR_PUTTING_FILE_DESCRIPTOR, id, err, ORIGIN_SYS_ENTER_WRITE_CODE);
}
}

View File

@@ -0,0 +1,79 @@
/*
Note: This file is licenced differently from the rest of the project
SPDX-License-Identifier: GPL-2.0
Copyright (C) UP9 Inc.
*/
#ifndef __LOG__
#define __LOG__
// The same consts defined in bpf_logger.go
//
#define LOG_LEVEL_ERROR (0)
#define LOG_LEVEL_INFO (1)
#define LOG_LEVEL_DEBUG (2)
// The same struct can be found in bpf_logger.go
//
// Be careful when editing, alignment and padding should be exactly the same in go/c.
//
struct log_message {
__u32 level;
__u32 message_code;
__u64 arg1;
__u64 arg2;
__u64 arg3;
};
static __always_inline void log_error(void* ctx, __u16 message_code, __u64 arg1, __u64 arg2, __u64 arg3) {
struct log_message entry = {};
entry.level = LOG_LEVEL_ERROR;
entry.message_code = message_code;
entry.arg1 = arg1;
entry.arg2 = arg2;
entry.arg3 = arg3;
long err = bpf_perf_event_output(ctx, &log_buffer, BPF_F_CURRENT_CPU, &entry, sizeof(struct log_message));
if (err != 0) {
char msg[] = "Error writing log error to perf buffer - %ld";
bpf_trace_printk(msg, sizeof(msg), err);
}
}
static __always_inline void log_info(void* ctx, __u16 message_code, __u64 arg1, __u64 arg2, __u64 arg3) {
struct log_message entry = {};
entry.level = LOG_LEVEL_INFO;
entry.message_code = message_code;
entry.arg1 = arg1;
entry.arg2 = arg2;
entry.arg3 = arg3;
long err = bpf_perf_event_output(ctx, &log_buffer, BPF_F_CURRENT_CPU, &entry, sizeof(struct log_message));
if (err != 0) {
char msg[] = "Error writing log info to perf buffer - %ld";
bpf_trace_printk(msg, sizeof(msg), arg1, err);
}
}
static __always_inline void log_debug(void* ctx, __u16 message_code, __u64 arg1, __u64 arg2, __u64 arg3) {
struct log_message entry = {};
entry.level = LOG_LEVEL_DEBUG;
entry.message_code = message_code;
entry.arg1 = arg1;
entry.arg2 = arg2;
entry.arg3 = arg3;
long err = bpf_perf_event_output(ctx, &log_buffer, BPF_F_CURRENT_CPU, &entry, sizeof(struct log_message));
if (err != 0) {
char msg[] = "Error writing log debug to perf buffer - %ld";
bpf_trace_printk(msg, sizeof(msg), arg1, err);
}
}
#endif /* __LOG__ */

View File

@@ -0,0 +1,42 @@
/*
Note: This file is licenced differently from the rest of the project
SPDX-License-Identifier: GPL-2.0
Copyright (C) UP9 Inc.
*/
#ifndef __LOG_MESSAGES__
#define __LOG_MESSAGES__
// Must be synced with bpf_logger_messages.go
//
#define LOG_ERROR_READING_BYTES_COUNT (0)
#define LOG_ERROR_READING_FD_ADDRESS (1)
#define LOG_ERROR_READING_FROM_SSL_BUFFER (2)
#define LOG_ERROR_BUFFER_TOO_BIG (3)
#define LOG_ERROR_ALLOCATING_CHUNK (4)
#define LOG_ERROR_READING_SSL_CONTEXT (5)
#define LOG_ERROR_PUTTING_SSL_CONTEXT (6)
#define LOG_ERROR_GETTING_SSL_CONTEXT (7)
#define LOG_ERROR_MISSING_FILE_DESCRIPTOR (8)
#define LOG_ERROR_PUTTING_FILE_DESCRIPTOR (9)
#define LOG_ERROR_PUTTING_ACCEPT_INFO (10)
#define LOG_ERROR_GETTING_ACCEPT_INFO (11)
#define LOG_ERROR_READING_ACCEPT_INFO (12)
#define LOG_ERROR_PUTTING_FD_MAPPING (13)
#define LOG_ERROR_PUTTING_CONNECT_INFO (14)
#define LOG_ERROR_GETTING_CONNECT_INFO (15)
#define LOG_ERROR_READING_CONNECT_INFO (16)
// Sometimes we have the same error, happening from different locations.
// in order to be able to distinct between them in the log, we add an
// extra number that identify the location. The number can be anything,
// but do not give the same number to different origins.
//
#define ORIGIN_SSL_UPROBE_CODE (0l)
#define ORIGIN_SSL_URETPROBE_CODE (1l)
#define ORIGIN_SYS_ENTER_READ_CODE (2l)
#define ORIGIN_SYS_ENTER_WRITE_CODE (3l)
#define ORIGIN_SYS_EXIT_ACCEPT4_CODE (4l)
#define ORIGIN_SYS_EXIT_CONNECT_CODE (5l)
#endif /* __LOG_MESSAGES__ */

View File

@@ -70,5 +70,6 @@ BPF_LRU_HASH(ssl_write_context, __u64, struct ssl_info);
BPF_LRU_HASH(ssl_read_context, __u64, struct ssl_info);
BPF_HASH(file_descriptor_to_ipv4, __u64, struct fd_info);
BPF_PERF_OUTPUT(chunks_buffer);
BPF_PERF_OUTPUT(log_buffer);
#endif /* __MAPS__ */

View File

@@ -7,6 +7,8 @@ Copyright (C) UP9 Inc.
#include "include/headers.h"
#include "include/util.h"
#include "include/maps.h"
#include "include/log.h"
#include "include/logger_messages.h"
#include "include/pids.h"
// Heap-like area for eBPF programs - stack size limited to 512 bytes, we must use maps for bigger (chunk) objects.
@@ -39,15 +41,14 @@ static __always_inline int get_count_bytes(struct pt_regs *ctx, struct ssl_info*
long err = bpf_probe_read(&countBytes, sizeof(size_t), (void*) info->count_ptr);
if (err != 0) {
char msg[] = "Error reading bytes count of _ex (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_BYTES_COUNT, id, err, 0l);
return 0;
}
return countBytes;
}
static __always_inline void add_address_to_chunk(struct tlsChunk* chunk, __u64 id, __u32 fd) {
static __always_inline void add_address_to_chunk(struct pt_regs *ctx, struct tlsChunk* chunk, __u64 id, __u32 fd) {
__u32 pid = id >> 32;
__u64 key = (__u64) pid << 32 | fd;
@@ -61,8 +62,7 @@ static __always_inline void add_address_to_chunk(struct tlsChunk* chunk, __u64 i
chunk->flags |= (fdinfo->flags & FLAGS_IS_CLIENT_BIT);
if (err != 0) {
char msg[] = "Error reading from fd address %ld - %ld";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_FD_ADDRESS, id, err, 0l);
}
}
@@ -88,8 +88,7 @@ static __always_inline void send_chunk_part(struct pt_regs *ctx, __u8* buffer, _
}
if (err != 0) {
char msg[] = "Error reading from ssl buffer %ld - %ld";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_FROM_SSL_BUFFER, id, err, 0l);
return;
}
@@ -101,8 +100,9 @@ static __always_inline void send_chunk(struct pt_regs *ctx, __u8* buffer, __u64
//
// https://lwn.net/Articles/794934/
//
// If we want to compile in kernel older than 5.3, we should add "#pragma unroll" to this loop
// However we want to run in kernel older than 5.3, hence we use "#pragma unroll" anyway
//
#pragma unroll
for (int i = 0; i < MAX_CHUNKS_PER_OPERATION; i++) {
if (chunk->len <= (CHUNK_SIZE * i)) {
break;
@@ -120,8 +120,7 @@ static __always_inline void output_ssl_chunk(struct pt_regs *ctx, struct ssl_inf
}
if (countBytes > (CHUNK_SIZE * MAX_CHUNKS_PER_OPERATION)) {
char msg[] = "Buffer too big %d (id: %ld)";
bpf_trace_printk(msg, sizeof(msg), countBytes, id);
log_error(ctx, LOG_ERROR_BUFFER_TOO_BIG, id, countBytes, 0l);
return;
}
@@ -134,8 +133,7 @@ static __always_inline void output_ssl_chunk(struct pt_regs *ctx, struct ssl_inf
chunk = bpf_map_lookup_elem(&heap, &zero);
if (!chunk) {
char msg[] = "Unable to allocate chunk (id: %ld)";
bpf_trace_printk(msg, sizeof(msg), id);
log_error(ctx, LOG_ERROR_ALLOCATING_CHUNK, id, 0l, 0l);
return;
}
@@ -145,11 +143,11 @@ static __always_inline void output_ssl_chunk(struct pt_regs *ctx, struct ssl_inf
chunk->len = countBytes;
chunk->fd = info->fd;
add_address_to_chunk(chunk, id, chunk->fd);
add_address_to_chunk(ctx, chunk, id, chunk->fd);
send_chunk(ctx, info->buffer, id, chunk);
}
static __always_inline void ssl_uprobe(void* ssl, void* buffer, int num, struct bpf_map_def* map_fd, size_t *count_ptr) {
static __always_inline void ssl_uprobe(struct pt_regs *ctx, void* ssl, void* buffer, int num, struct bpf_map_def* map_fd, size_t *count_ptr) {
__u64 id = bpf_get_current_pid_tgid();
if (!should_tap(id >> 32)) {
@@ -166,8 +164,7 @@ static __always_inline void ssl_uprobe(void* ssl, void* buffer, int num, struct
long err = bpf_probe_read(&info, sizeof(struct ssl_info), infoPtr);
if (err != 0) {
char msg[] = "Error reading old ssl context (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_SSL_CONTEXT, id, err, ORIGIN_SSL_UPROBE_CODE);
}
if ((bpf_ktime_get_ns() - info.created_at_nano) > SSL_INFO_MAX_TTL_NANO) {
@@ -184,8 +181,7 @@ static __always_inline void ssl_uprobe(void* ssl, void* buffer, int num, struct
long err = bpf_map_update_elem(map_fd, &id, &info, BPF_ANY);
if (err != 0) {
char msg[] = "Error putting ssl context (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_PUTTING_SSL_CONTEXT, id, err, 0l);
}
}
@@ -199,8 +195,7 @@ static __always_inline void ssl_uretprobe(struct pt_regs *ctx, struct bpf_map_de
struct ssl_info *infoPtr = bpf_map_lookup_elem(map_fd, &id);
if (infoPtr == NULL) {
char msg[] = "Error getting ssl context info (id: %ld)";
bpf_trace_printk(msg, sizeof(msg), id);
log_error(ctx, LOG_ERROR_GETTING_SSL_CONTEXT, id, 0l, 0l);
return;
}
@@ -220,14 +215,12 @@ static __always_inline void ssl_uretprobe(struct pt_regs *ctx, struct bpf_map_de
// bpf_map_delete_elem(map_fd, &id);
if (err != 0) {
char msg[] = "Error reading ssl context (id: %ld) (err: %ld)";
bpf_trace_printk(msg, sizeof(msg), id, err);
log_error(ctx, LOG_ERROR_READING_SSL_CONTEXT, id, err, ORIGIN_SSL_URETPROBE_CODE);
return;
}
if (info.fd == -1) {
char msg[] = "File descriptor is missing from ssl info (id: %ld)";
bpf_trace_printk(msg, sizeof(msg), id);
log_error(ctx, LOG_ERROR_MISSING_FILE_DESCRIPTOR, id, 0l, 0l);
return;
}
@@ -236,7 +229,7 @@ static __always_inline void ssl_uretprobe(struct pt_regs *ctx, struct bpf_map_de
SEC("uprobe/ssl_write")
void BPF_KPROBE(ssl_write, void* ssl, void* buffer, int num) {
ssl_uprobe(ssl, buffer, num, &ssl_write_context, 0);
ssl_uprobe(ctx, ssl, buffer, num, &ssl_write_context, 0);
}
SEC("uretprobe/ssl_write")
@@ -246,7 +239,7 @@ void BPF_KPROBE(ssl_ret_write) {
SEC("uprobe/ssl_read")
void BPF_KPROBE(ssl_read, void* ssl, void* buffer, int num) {
ssl_uprobe(ssl, buffer, num, &ssl_read_context, 0);
ssl_uprobe(ctx, ssl, buffer, num, &ssl_read_context, 0);
}
SEC("uretprobe/ssl_read")
@@ -256,7 +249,7 @@ void BPF_KPROBE(ssl_ret_read) {
SEC("uprobe/ssl_write_ex")
void BPF_KPROBE(ssl_write_ex, void* ssl, void* buffer, size_t num, size_t *written) {
ssl_uprobe(ssl, buffer, num, &ssl_write_context, written);
ssl_uprobe(ctx, ssl, buffer, num, &ssl_write_context, written);
}
SEC("uretprobe/ssl_write_ex")
@@ -266,7 +259,7 @@ void BPF_KPROBE(ssl_ret_write_ex) {
SEC("uprobe/ssl_read_ex")
void BPF_KPROBE(ssl_read_ex, void* ssl, void* buffer, size_t num, size_t *readbytes) {
ssl_uprobe(ssl, buffer, num, &ssl_read_context, readbytes);
ssl_uprobe(ctx, ssl, buffer, num, &ssl_read_context, readbytes);
}
SEC("uretprobe/ssl_read_ex")

View File

@@ -7,6 +7,8 @@ Copyright (C) UP9 Inc.
#include "include/headers.h"
#include "include/util.h"
#include "include/maps.h"
#include "include/log.h"
#include "include/logger_messages.h"
#include "include/pids.h"
// To avoid multiple .o files

116
tap/tlstapper/bpf_logger.go Normal file
View File

@@ -0,0 +1,116 @@
package tlstapper
import (
"bytes"
"encoding/binary"
"strings"
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
)
const logPrefix = "[bpf] "
// The same consts defined in log.h
//
const logLevelError = 0
const logLevelInfo = 1
const logLevelDebug = 2
type logMessage struct {
Level uint32
MessageCode uint32
Arg1 uint64
Arg2 uint64
Arg3 uint64
}
type bpfLogger struct {
logReader *perf.Reader
}
func newBpfLogger() *bpfLogger {
return &bpfLogger{
logReader: nil,
}
}
func (p *bpfLogger) init(bpfObjects *tlsTapperObjects, bufferSize int) error {
var err error
p.logReader, err = perf.NewReader(bpfObjects.LogBuffer, bufferSize)
if err != nil {
return errors.Wrap(err, 0)
}
return nil
}
func (p *bpfLogger) close() error {
return p.logReader.Close()
}
func (p *bpfLogger) poll() {
logger.Log.Infof("Start polling for bpf logs")
for {
record, err := p.logReader.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
return
}
LogError(errors.Errorf("Error reading from bpf logger perf buffer, aboring logger! %w", err))
return
}
if record.LostSamples != 0 {
logger.Log.Infof("Log buffer is full, dropped %d logs", record.LostSamples)
continue
}
buffer := bytes.NewReader(record.RawSample)
var log logMessage
if err := binary.Read(buffer, binary.LittleEndian, &log); err != nil {
LogError(errors.Errorf("Error parsing log %v", err))
continue
}
p.log(&log)
}
}
func (p *bpfLogger) log(log *logMessage) {
if int(log.MessageCode) >= len(bpfLogMessages) {
logger.Log.Errorf("Unknown message code from bpf logger %d", log.MessageCode)
return
}
format := bpfLogMessages[log.MessageCode]
tokensCount := strings.Count(format, "%")
if tokensCount == 0 {
p.logLevel(log.Level, format)
} else if tokensCount == 1 {
p.logLevel(log.Level, format, log.Arg1)
} else if tokensCount == 2 {
p.logLevel(log.Level, format, log.Arg1, log.Arg2)
} else if tokensCount == 3 {
p.logLevel(log.Level, format, log.Arg1, log.Arg2, log.Arg3)
}
}
func (p *bpfLogger) logLevel(level uint32, format string, args ...interface{}) {
if level == logLevelError {
logger.Log.Errorf(logPrefix+format, args...)
} else if level == logLevelInfo {
logger.Log.Infof(logPrefix+format, args...)
} else if level == logLevelDebug {
logger.Log.Debugf(logPrefix+format, args...)
}
}

View File

@@ -0,0 +1,25 @@
package tlstapper
// Must be synced with logger_messages.h
//
var bpfLogMessages = []string {
/*0000*/ "[%d] Unable to read bytes count from _ex methods [err: %d]",
/*0001*/ "[%d] Unable to read ipv4 address [err: %d]",
/*0002*/ "[%d] Unable to read ssl buffer [err: %d]",
/*0003*/ "[%d] Buffer is too big [size: %d]",
/*0004*/ "[%d] Unable to allocate chunk in bpf heap",
/*0005*/ "[%d] Unable to read ssl context [err: %d] [origin: %d]",
/*0006*/ "[%d] Unable to put ssl context [err: %d]",
/*0007*/ "[%d] Unable to get ssl context",
/*0008*/ "[%d] File descriptor is missing for tls chunk",
/*0009*/ "[%d] Unable to put file descriptor [err: %d] [origin: %d]",
/*0010*/ "[%d] Unable to put accept info [err: %d]",
/*0011*/ "[%d] Unable to get accept info",
/*0012*/ "[%d] Unable to read accept info [err: %d]",
/*0013*/ "[%d] Unable to put file descriptor to address mapping [err: %d] [origin: %d]",
/*0014*/ "[%d] Unable to put connect info [err: %d]",
/*0015*/ "[%d] Unable to get connect info",
/*0016*/ "[%d] Unable to read connect info [err: %d]",
}

View File

@@ -8,6 +8,8 @@ import (
"sync"
)
const GLOABL_TAP_PID = 0
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go tlsTapper bpf/tls_tapper.c -- -O2 -g -D__TARGET_ARCH_x86
type TlsTapper struct {
@@ -15,11 +17,12 @@ type TlsTapper struct {
syscallHooks syscallHooks
sslHooksStructs []sslHooks
poller *tlsPoller
bpfLogger *bpfLogger
registeredPids sync.Map
}
func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension) error {
logger.Log.Infof("Initializing tls tapper (bufferSize: %v)", bufferSize)
func (t *TlsTapper) Init(chunksBufferSize int, logBufferSize int, procfs string, extension *api.Extension) error {
logger.Log.Infof("Initializing tls tapper (chunksSize: %d) (logSize: %d)", chunksBufferSize, logBufferSize)
if err := setupRLimit(); err != nil {
return err
@@ -37,16 +40,25 @@ func (t *TlsTapper) Init(bufferSize int, procfs string, extension *api.Extension
t.sslHooksStructs = make([]sslHooks, 0)
t.bpfLogger = newBpfLogger()
if err := t.bpfLogger.init(&t.bpfObjects, logBufferSize); err != nil {
return err
}
t.poller = newTlsPoller(t, extension, procfs)
return t.poller.init(&t.bpfObjects, bufferSize)
return t.poller.init(&t.bpfObjects, chunksBufferSize)
}
func (t *TlsTapper) Poll(emitter api.Emitter, options *api.TrafficFilteringOptions) {
t.poller.poll(emitter, options)
}
func (t *TlsTapper) PollForLogging() {
t.bpfLogger.poll()
}
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
return t.tapPid(0, sslLibrary)
return t.tapPid(GLOABL_TAP_PID, sslLibrary)
}
func (t *TlsTapper) AddPid(procfs string, pid uint32) error {
@@ -74,7 +86,12 @@ func (t *TlsTapper) RemovePid(pid uint32) error {
func (t *TlsTapper) ClearPids() {
t.registeredPids.Range(func(key, v interface{}) bool {
if err := t.RemovePid(key.(uint32)); err != nil {
pid := key.(uint32)
if pid == GLOABL_TAP_PID {
return true
}
if err := t.RemovePid(pid); err != nil {
LogError(err)
}
t.registeredPids.Delete(key)
@@ -95,6 +112,10 @@ func (t *TlsTapper) Close() []error {
errors = append(errors, sslHooks.close()...)
}
if err := t.bpfLogger.close(); err != nil {
errors = append(errors, err)
}
if err := t.poller.close(); err != nil {
errors = append(errors, err)
}

View File

@@ -78,6 +78,7 @@ type tlsTapperMapSpecs struct {
ConnectSyscallInfo *ebpf.MapSpec `ebpf:"connect_syscall_info"`
FileDescriptorToIpv4 *ebpf.MapSpec `ebpf:"file_descriptor_to_ipv4"`
Heap *ebpf.MapSpec `ebpf:"heap"`
LogBuffer *ebpf.MapSpec `ebpf:"log_buffer"`
PidsMap *ebpf.MapSpec `ebpf:"pids_map"`
SslReadContext *ebpf.MapSpec `ebpf:"ssl_read_context"`
SslWriteContext *ebpf.MapSpec `ebpf:"ssl_write_context"`
@@ -107,6 +108,7 @@ type tlsTapperMaps struct {
ConnectSyscallInfo *ebpf.Map `ebpf:"connect_syscall_info"`
FileDescriptorToIpv4 *ebpf.Map `ebpf:"file_descriptor_to_ipv4"`
Heap *ebpf.Map `ebpf:"heap"`
LogBuffer *ebpf.Map `ebpf:"log_buffer"`
PidsMap *ebpf.Map `ebpf:"pids_map"`
SslReadContext *ebpf.Map `ebpf:"ssl_read_context"`
SslWriteContext *ebpf.Map `ebpf:"ssl_write_context"`
@@ -119,6 +121,7 @@ func (m *tlsTapperMaps) Close() error {
m.ConnectSyscallInfo,
m.FileDescriptorToIpv4,
m.Heap,
m.LogBuffer,
m.PidsMap,
m.SslReadContext,
m.SslWriteContext,

Binary file not shown.

View File

@@ -78,6 +78,7 @@ type tlsTapperMapSpecs struct {
ConnectSyscallInfo *ebpf.MapSpec `ebpf:"connect_syscall_info"`
FileDescriptorToIpv4 *ebpf.MapSpec `ebpf:"file_descriptor_to_ipv4"`
Heap *ebpf.MapSpec `ebpf:"heap"`
LogBuffer *ebpf.MapSpec `ebpf:"log_buffer"`
PidsMap *ebpf.MapSpec `ebpf:"pids_map"`
SslReadContext *ebpf.MapSpec `ebpf:"ssl_read_context"`
SslWriteContext *ebpf.MapSpec `ebpf:"ssl_write_context"`
@@ -107,6 +108,7 @@ type tlsTapperMaps struct {
ConnectSyscallInfo *ebpf.Map `ebpf:"connect_syscall_info"`
FileDescriptorToIpv4 *ebpf.Map `ebpf:"file_descriptor_to_ipv4"`
Heap *ebpf.Map `ebpf:"heap"`
LogBuffer *ebpf.Map `ebpf:"log_buffer"`
PidsMap *ebpf.Map `ebpf:"pids_map"`
SslReadContext *ebpf.Map `ebpf:"ssl_read_context"`
SslWriteContext *ebpf.Map `ebpf:"ssl_write_context"`
@@ -119,6 +121,7 @@ func (m *tlsTapperMaps) Close() error {
m.ConnectSyscallInfo,
m.FileDescriptorToIpv4,
m.Heap,
m.LogBuffer,
m.PidsMap,
m.SslReadContext,
m.SslWriteContext,

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{
"name": "@up9/mizu-common",
"version": "1.0.147",
"version": "1.0.145",
"description": "Made with create-react-library",
"author": "",
"license": "MIT",

View File

@@ -66,10 +66,8 @@
margin-top: -60px
.capture img
height: 14px
height: 20px
z-index: 1000
margin-top: 12px
margin-left: -2px
.endpointServiceContainer
display: flex

View File

@@ -140,6 +140,8 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
const isStatusCodeEnabled = ((entry.proto.name === "http" && "status" in entry) || entry.status !== 0);
let endpointServiceContainer = "10px";
if (!isStatusCodeEnabled) endpointServiceContainer = "20px";
return <React.Fragment>
<div
@@ -176,7 +178,7 @@ export const EntryItem: React.FC<EntryProps> = ({entry, style, headingMode}) =>
{isStatusCodeEnabled && <div>
<StatusCode statusCode={entry.status} statusQuery={entry.statusQuery}/>
</div>}
<div className={styles.endpointServiceContainer} style={{paddingLeft: 10}}>
<div className={styles.endpointServiceContainer} style={{paddingLeft: endpointServiceContainer}}>
<Summary method={entry.method} methodQuery={entry.methodQuery} summary={entry.summary} summaryQuery={entry.summaryQuery}/>
<div className={styles.resolvedName}>
<Queryable

View File

@@ -110,8 +110,27 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus,
isCloseWebSocket && closeWebSocket()
}, [isCloseWebSocket])
useEffect(() => {
reopenConnection()
}, [webSocketUrl])
const ws = useRef(null);
const openEmptyWebSocket = () => {
if (query) {
openWebSocket(`(${query}) and leftOff(-1)`, true);
} else {
openWebSocket(`leftOff(-1)`, true);
}
}
const closeWebSocket = () => {
if(ws?.current?.readyState === WebSocket.OPEN) {
ws.current.close();
return true;
}
}
const listEntry = useRef(null);
const openWebSocket = (query: string, resetEntries: boolean) => {
if (resetEntries) {
@@ -153,12 +172,6 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus,
}, 500)
}
const closeWebSocket = () => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws.current.close();
}
}
if (ws.current) {
ws.current.onmessage = (e) => {
if (!e?.data) return;
@@ -216,8 +229,7 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus,
useEffect(() => {
setTrafficViewerApiState({ ...trafficViewerApiProp, webSocket: { close: closeWebSocket } });
(async () => {
openWebSocket("leftOff(-1)", true);
try {
try{
const tapStatusResponse = await trafficViewerApiProp.tapStatus();
setTappingStatus(tapStatusResponse);
if (setAnalyzeStatus) {
@@ -232,19 +244,18 @@ export const TrafficViewer: React.FC<TrafficViewerProps> = ({ setAnalyzeStatus,
}, []);
const toggleConnection = () => {
if (ws?.current?.readyState === WebSocket.OPEN) {
ws?.current?.close();
} else {
if (query) {
openWebSocket(`(${query}) and leftOff(-1)`, true);
} else {
openWebSocket(`leftOff(-1)`, true);
}
if(!closeWebSocket()) {
openEmptyWebSocket();
scrollableRef.current.jumpToBottom();
setIsSnappedToBottom(true);
}
}
const reopenConnection = async () => {
closeWebSocket()
openEmptyWebSocket();
}
useEffect(() => {
return () => {
ws.current.close();

View File

@@ -54,7 +54,7 @@ const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
backgroundColor: protocol.backgroundColor,
color: protocol.foregroundColor,
fontSize: protocol.fontSize,
marginRight: "-6px",
marginRight: "-20px",
}}
title={protocol.longName}
>

View File

@@ -9,7 +9,7 @@
text-align: center
line-height: 22px
font-weight: 600
margin-left: 3px
margin-left: 8px
.neutral
background: gray

View File

@@ -13,7 +13,7 @@
"@types/jest": "^26.0.22",
"@types/node": "^12.20.10",
"@uiw/react-textarea-code-editor": "^1.4.12",
"@up9/mizu-common": "1.0.147",
"@up9/mizu-common": "1.0.145",
"axios": "^0.25.0",
"core-js": "^3.20.2",
"craco-babel-loader": "^1.0.3",

View File

@@ -1,6 +1,6 @@
import React, {useEffect, useState} from "react";
import { Button } from "@material-ui/core";
import Api, { MizuWebsocketURL } from "../../../helpers/api";
import Api,{getWebsocketUrl} from "../../../helpers/api";
import debounce from 'lodash/debounce';
import {useSetRecoilState, useRecoilState} from "recoil";
import {useCommonStyles} from "../../../helpers/commonStyle"
@@ -65,7 +65,7 @@ const trafficViewerApi = {...api}
return (
<>
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={MizuWebsocketURL} isCloseWebSocket={!openWebSocket}
<TrafficViewer setAnalyzeStatus={setAnalyzeStatus} webSocketUrl={getWebsocketUrl()} isCloseWebSocket={!openWebSocket}
trafficViewerApiProp={trafficViewerApi} actionButtons={actionButtons} isShowStatusBar={!openOasModal} isDemoBannerView={false}/>
</>
);

View File

@@ -100,7 +100,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
const commonClasses = useCommonStyles();
const [isLoading, setIsLoading] = useState<boolean>(true);
const [graphData, setGraphData] = useState<GraphData>({ nodes: [], edges: [] });
const [graphOptions, setGraphOptions] = useState(ServiceMapOptions);
const getServiceMapData = useCallback(async () => {
try {
@@ -150,14 +149,6 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
// eslint-disable-next-line
}, [isOpen])
useEffect(() => {
if(graphData?.nodes?.length === 0) return;
let options = {...graphOptions};
options.physics.barnesHut.avoidOverlap = graphData?.nodes?.length > 10 ? 0 : 1;
setGraphOptions(options);
// eslint-disable-next-line
},[graphData?.nodes?.length])
useEffect(() => {
getServiceMapData();
return () => setGraphData({ nodes: [], edges: [] })
@@ -189,7 +180,7 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
<div style={{ display: "flex", justifyContent: "space-between" }}>
<div>
<Button
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight: "8%" }}/>}
startIcon={<img src={refresh} className="custom" alt="refresh" style={{ marginRight: "8%" }}></img>}
size="medium"
variant="contained"
className={commonClasses.outlinedButton + " " + commonClasses.imagedButton}
@@ -198,20 +189,20 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
Refresh
</Button>
</div>
<img src={close} alt="close" onClick={() => onClose()} style={{ cursor: "pointer" }}/>
<img src={close} alt="close" onClick={() => onClose()} style={{ cursor: "pointer" }}></img>
</div>
<Graph
graph={graphData}
options={graphOptions}
options={ServiceMapOptions}
/>
<div className='legend-scale'>
<ul className='legend-labels'>
<li><span style={{ background: '#205cf5' }}/>HTTP</li>
<li><span style={{ background: '#244c5a' }}/>HTTP/2</li>
<li><span style={{ background: '#244c5a' }}/>gRPC</li>
<li><span style={{ background: '#ff6600' }}/>AMQP</li>
<li><span style={{ background: '#000000' }}/>KAFKA</li>
<li><span style={{ background: '#a41e11' }}/>REDIS</li>
<li><span style={{ background: '#205cf5' }}></span>HTTP</li>
<li><span style={{ background: '#244c5a' }}></span>HTTP/2</li>
<li><span style={{ background: '#244c5a' }}></span>gRPC</li>
<li><span style={{ background: '#ff6600' }}></span>AMQP</li>
<li><span style={{ background: '#000000' }}></span>KAFKA</li>
<li><span style={{ background: '#a41e11' }}></span>REDIS</li>
</ul>
</div>
</div>}
@@ -220,4 +211,4 @@ export const ServiceMapModal: React.FC<ServiceMapModalProps> = ({ isOpen, onOpen
</Modal>
);
}
}

View File

@@ -148,7 +148,7 @@ const ServiceMapOptions = {
springLength: 180,
springConstant: 0.04,
damping: 0.2,
avoidOverlap: 0
avoidOverlap: 1
},
},
layout: {
@@ -171,4 +171,4 @@ const ServiceMapOptions = {
},
};
export default ServiceMapOptions
export default ServiceMapOptions

View File

@@ -3,10 +3,13 @@ import * as axios from "axios";
export const MizuWebsocketURL = process.env.REACT_APP_OVERRIDE_WS_URL ? process.env.REACT_APP_OVERRIDE_WS_URL :
window.location.protocol === 'https:' ? `wss://${window.location.host}/ws` : `ws://${window.location.host}/ws`;
export const FormValidationErrorType = "formError";
const CancelToken = axios.CancelToken;
const apiURL = process.env.REACT_APP_OVERRIDE_API_URL ? process.env.REACT_APP_OVERRIDE_API_URL : `${window.location.origin}/`;
let token = ""
let client = null
let source = null
@@ -21,6 +24,8 @@ export default class Api {
}
constructor() {
token = localStorage.getItem("token");
client = this.getAxiosClient();
source = null;
}
@@ -120,10 +125,20 @@ export default class Api {
return response.data;
}
persistToken = (tk) => {
token = tk;
client = this.getAxiosClient();
localStorage.setItem('token', token);
}
getAxiosClient = () => {
const headers = {
Accept: "application/json"
}
if (token) {
headers['x-session-token'] = `${token}`; // we use `x-session-token` instead of `Authorization` because the latter is reserved by kubectl proxy, making mizu view not work
}
return axios.create({
baseURL: apiURL,
timeout: 31000,
@@ -131,3 +146,12 @@ export default class Api {
});
}
}
export function getWebsocketUrl() {
let websocketUrl = MizuWebsocketURL;
if (token) {
websocketUrl += `/${token}`;
}
return websocketUrl;
}