Compare commits

..

4 Commits

Author SHA1 Message Date
M. Mert Yıldıran
8cf6f56a3c Remove unnecessary tcpdump dependency from Dockerfile (#491) 2021-11-21 09:12:51 +03:00
M. Mert Yıldıran
a849aae94c Upgrade Basenine version from 0.2.9 to 0.2.10 (#484)
* Upgrade Basenine version from `0.2.9` to `0.2.10`

Fixes the issues in `limit` and `rlimit` helpers that occur when they are on the left operand of a binary expression.

* Upgrade the client hash to latest
2021-11-19 18:57:19 +03:00
M. Mert Yıldıran
8118569460 Show the source and destination IP in the entry feed (#485) 2021-11-18 20:21:51 +03:00
Nimrod Gilboa Markevich
2e75834dd0 Refactor watch pods to allow reusing watch wrapper (#470)
Currently shared/kubernetes/watch.go:FilteredWatch only watches pods.
This PR makes it reusable for other types of resources.
This is done in preparation for watching k8s events.
2021-11-18 11:53:11 +02:00
12 changed files with 212 additions and 64 deletions

View File

@@ -42,8 +42,8 @@ RUN go build -ldflags="-s -w \
-X 'mizuserver/pkg/version.SemVer=${SEM_VER}'" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -52,7 +52,7 @@ RUN cd .. && /bin/bash build_extensions.sh
FROM alpine:3.14
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

View File

@@ -16,7 +16,7 @@ require (
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0

View File

@@ -450,8 +450,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5 h1:JbLairDLEJpAC8bwmFuOAB+LYpY/oQbzGRSWRpkF7PQ=
github.com/up9inc/basenine/client/go v0.0.0-20211114204315-4d028da5fda5/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73 h1:FJUM7w7E0jRGFPcSMa7cVy+jr5zcpbyT6qA30dEtGGI=
github.com/up9inc/basenine/client/go v0.0.0-20211118123155-7ed075f85c73/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI=
github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=

View File

@@ -9,19 +9,18 @@ import (
"strings"
"time"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/getkin/kin-openapi/openapi3"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/getkin/kin-openapi/openapi3"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/cmd/goUtils"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"gopkg.in/yaml.v3"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
@@ -555,7 +554,8 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
@@ -576,12 +576,19 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
cancel()
return
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodPending {
@@ -642,34 +649,57 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
var prevPodPhase core.PodPhase
for {
select {
case addedPod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
addedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
case removedPod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
removedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
case modifiedPod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
modifiedPod, err := wEvent.ToPod()
if err != nil {
logger.Log.Errorf(uiUtils.Error, err)
cancel()
continue
}
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
cancel()
break
continue
}
podStatus := modifiedPod.Status

View File

@@ -37,8 +37,8 @@ COPY agent .
RUN go build -gcflags="all=-N -l" -o mizuagent .
# Download Basenine executable, verify the sha1sum and move it to a directory in $PATH
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.9/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64 ./basenine_linux_amd64
ADD https://github.com/up9inc/basenine/releases/download/v0.2.10/basenine_linux_amd64.sha256 ./basenine_linux_amd64.sha256
RUN shasum -a 256 -c basenine_linux_amd64.sha256
RUN chmod +x ./basenine_linux_amd64
@@ -48,7 +48,7 @@ RUN cd .. && /bin/bash build_extensions_debug.sh
FROM golang:1.16-alpine
RUN apk add bash libpcap-dev tcpdump
RUN apk add bash libpcap-dev
WORKDIR /app

View File

@@ -68,7 +68,8 @@ func CreateAndStartMizuTapperSyncer(ctx context.Context, kubernetesProvider *Pro
}
func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, tapperSyncer.kubernetesProvider, tapperSyncer.config.TargetNamespaces, &tapperSyncer.config.PodFilterRegex)
podWatchHelper := NewPodWatchHelper(tapperSyncer.kubernetesProvider, &tapperSyncer.config.PodFilterRegex)
added, modified, removed, errorChan := FilteredWatch(tapperSyncer.context, podWatchHelper, tapperSyncer.config.TargetNamespaces, podWatchHelper)
restartTappers := func() {
err, changeFound := tapperSyncer.updateCurrentlyTappedPods()
@@ -94,28 +95,48 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
for {
select {
case pod, ok := <-added:
case wEvent, ok := <-added:
if !ok {
added = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-removed:
case wEvent, ok := <-removed:
if !ok {
removed = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod, ok := <-modified:
case wEvent, ok := <-modified:
if !ok {
modified = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
}
logger.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
@@ -132,12 +153,8 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
continue
}
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
tapperSyncer.handleErrorInWatchLoop(err, restartTappersDebouncer)
continue
case <-tapperSyncer.context.Done():
logger.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
@@ -148,6 +165,15 @@ func (tapperSyncer *MizuTapperSyncer) watchPodsForTapping() {
}
}
func (tapperSyncer *MizuTapperSyncer) handleErrorInWatchLoop(err error, restartTappersDebouncer *debounce.Debouncer) {
logger.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
restartTappersDebouncer.Cancel()
tapperSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (tapperSyncer *MizuTapperSyncer) updateCurrentlyTappedPods() (err error, changesFound bool) {
if matchingPods, err := tapperSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(tapperSyncer.context, &tapperSyncer.config.PodFilterRegex, tapperSyncer.config.TargetNamespaces); err != nil {
return err, false

View File

@@ -0,0 +1,45 @@
package kubernetes
import (
"context"
"regexp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)
type PodWatchHelper struct {
kubernetesProvider *Provider
NameRegexFilter *regexp.Regexp
}
func NewPodWatchHelper(kubernetesProvider *Provider, NameRegexFilter *regexp.Regexp) *PodWatchHelper {
return &PodWatchHelper{
kubernetesProvider: kubernetesProvider,
NameRegexFilter: NameRegexFilter,
}
}
// Implements the EventFilterer Interface
func (pwh *PodWatchHelper) Filter(wEvent *WatchEvent) (bool, error) {
pod, err := wEvent.ToPod()
if err != nil {
return false, nil
}
if !pwh.NameRegexFilter.MatchString(pod.Name) {
return false, nil
}
return true, nil
}
// Implements the WatchCreator Interface
func (pwh *PodWatchHelper) NewWatcher(ctx context.Context, namespace string) (watch.Interface, error) {
watcher, err := pwh.kubernetesProvider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return nil, err
}
return watcher, nil
}

View File

@@ -153,14 +153,6 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str
return err
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
panic(err.Error())
}
return watcher
}
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{

View File

@@ -6,19 +6,25 @@ import (
"fmt"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"regexp"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
)
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
addedChan := make(chan *corev1.Pod)
modifiedChan := make(chan *corev1.Pod)
removedChan := make(chan *corev1.Pod)
type EventFilterer interface {
Filter(*WatchEvent) (bool, error)
}
type WatchCreator interface {
NewWatcher(ctx context.Context, namespace string) (watch.Interface, error)
}
func FilteredWatch(ctx context.Context, watcherCreator WatchCreator, targetNamespaces []string, filterer EventFilterer) (chan *WatchEvent, chan *WatchEvent, chan *WatchEvent, chan error) {
addedChan := make(chan *WatchEvent)
modifiedChan := make(chan *WatchEvent)
removedChan := make(chan *WatchEvent)
errorChan := make(chan error)
var wg sync.WaitGroup
@@ -31,8 +37,13 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
watchRestartDebouncer := debounce.NewDebouncer(1 * time.Minute, func() {})
for {
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
err := startWatchLoop(ctx, watcher, podFilter, addedChan, modifiedChan, removedChan) // blocking
watcher, err := watcherCreator.NewWatcher(ctx, targetNamespace)
if err != nil {
errorChan <- fmt.Errorf("error in k8 watch: %v", err)
break
}
err = startWatchLoop(ctx, watcher, filterer, addedChan, modifiedChan, removedChan) // blocking
watcher.Stop()
select {
@@ -72,7 +83,7 @@ func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetName
return addedChan, modifiedChan, removedChan, errorChan
}
func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp, addedChan chan *corev1.Pod, modifiedChan chan *corev1.Pod, removedChan chan *corev1.Pod) error {
func startWatchLoop(ctx context.Context, watcher watch.Interface, filterer EventFilterer, addedChan chan *WatchEvent, modifiedChan chan *WatchEvent, removedChan chan *WatchEvent) error {
resultChan := watcher.ResultChan()
for {
select {
@@ -81,26 +92,25 @@ func startWatchLoop(ctx context.Context, watcher watch.Interface, podFilter *reg
return nil
}
if e.Type == watch.Error {
return apierrors.FromObject(e.Object)
wEvent := WatchEvent(e)
if wEvent.Type == watch.Error {
return apierrors.FromObject(wEvent.Object)
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
if pass, err := filterer.Filter(&wEvent); err != nil {
return err
} else if !pass {
continue
}
if !podFilter.MatchString(pod.Name) {
continue
}
switch e.Type {
switch wEvent.Type {
case watch.Added:
addedChan <- pod
addedChan <- &wEvent
case watch.Modified:
modifiedChan <- pod
modifiedChan <- &wEvent
case watch.Deleted:
removedChan <- pod
removedChan <- &wEvent
}
case <-ctx.Done():
return nil

View File

@@ -0,0 +1,18 @@
package kubernetes
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
type WatchEvent watch.Event
func (we *WatchEvent) ToPod() (*corev1.Pod, error) {
pod, ok := we.Object.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("Invalid object type on pod event stream")
}
return pod, nil
}

View File

@@ -84,7 +84,14 @@
padding: 4px
padding-left: 12px
.port
.tcpInfo
font-size: 12px
color: $secondary-font-color
margin: 5px
margin-top: 5px
margin-bottom: 5px
.port
margin-right: 5px
.ip
margin-left: 5px

View File

@@ -171,7 +171,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style
}
<div className={styles.separatorRight}>
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Source IP"
onClick={() => {
updateQuery(`src.ip == "${entry.sourceIp}"`)
}}
>
{entry.sourceIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Source Port"
onClick={() => {
updateQuery(`src.port == "${entry.sourcePort}"`)
@@ -199,7 +209,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, style
/>
}
<span
className={`queryable ${styles.port}`}
className={`queryable ${styles.tcpInfo} ${styles.ip}`}
title="Destination IP"
onClick={() => {
updateQuery(`dst.ip == "${entry.destinationIp}"`)
}}
>
{entry.destinationIp}
</span>
<span className={`${styles.tcpInfo}`}>:</span>
<span
className={`queryable ${styles.tcpInfo} ${styles.port}`}
title="Destination Port"
onClick={() => {
updateQuery(`dst.port == "${entry.destinationPort}"`)