Compare commits

..

35 Commits
38.0 ... 38.5

Author SHA1 Message Date
M. Mert Yildiran
a9fdde4110 🐛 Fix the missing / in image URL 2023-01-29 03:30:23 +03:00
M. Mert Yildiran
4eeab41e6d Don't fail if self namespace does not exist 2023-01-29 01:57:05 +03:00
M. Mert Yildiran
dce7de6cc9 Treat Docker registry as an absolute prefix 2023-01-29 01:56:18 +03:00
M. Mert Yildiran
c809117b2c 🐛 Don't open the Hub URL in the browser (proxy command) 2023-01-26 19:35:21 +03:00
M. Mert Yildiran
2c6c71cf49 Log the worker DaemonSet creation 2023-01-22 05:14:28 +03:00
M. Mert Yildiran
1533d1ec28 Suggest kubeshark proxy at the end of tap 2023-01-22 05:10:29 +03:00
M. Mert Yildiran
edf5c8cd6d Don't call cancel() in case of a proxy/port-forward error 2023-01-22 05:04:44 +03:00
M. Mert Yildiran
6c69fb6bc4 POST pod regex to Hub if Kubeshark is still running 2023-01-22 04:33:30 +03:00
M. Mert Yildiran
4dd941f8d9 Set REACT_APP_DEFAULT_FILTER to empty string 2023-01-22 04:27:05 +03:00
M. Mert Yildiran
90294e32c1 👕 Remove unused getK8sTapManagerErrorText method 2023-01-22 04:26:21 +03:00
M. Mert Yildiran
5f4a856c5e 🐛 Fix the proxy command 2023-01-22 04:25:47 +03:00
M. Mert Yildiran
4e3233ade8 Deploy workers DaemonSet without NodeSelector and call POST /pods/regex to set the pod regex in Hub 2023-01-22 04:14:44 +03:00
M. Mert Yildiran
846f253a03 Don't watch and POST Worker pods to Hub 2023-01-21 03:09:15 +03:00
M. Mert Yildiran
f128ae3993 🔥 Remove config map and image pull checks 2023-01-21 01:32:21 +03:00
Alon Girmonsky
38da25ecc8 Infra information
Lately we've been experiencing bugs resulting from an environment with multiple nodes and the absence of a CNI.
2023-01-18 10:04:47 -08:00
M. Mert Yildiran
bf777f9fca Change the color coding of new, targeted and untargeted pods log messages 2023-01-17 02:09:26 +03:00
M. Mert Yildiran
3c4272c6d1 🐛 Fix the issues in port-forward 2023-01-16 00:06:52 +03:00
Alon Girmonsky
920535b643 protocol-aware 2023-01-15 10:59:16 -08:00
Alon Girmonsky
b55dd5b072 Shorter message bar 2023-01-15 10:47:13 -08:00
Alon Girmonsky
f5cf15d657 Changing the hero message 2023-01-15 10:46:06 -08:00
Alon Girmonsky
fe623438c4 Update the upper message
include DNS and identity-aware service map.
2023-01-14 21:25:00 -08:00
M. Mert Yildiran
1c35a9f143 🐛 Fix the parsing of --proxy-front-port and --proxy-hub-port options 2023-01-15 02:31:47 +03:00
M. Mert Yildiran
d6766a8ac5 🐛 Fix .spec.template.spec.imagePullSecrets: element 0: associative list with keys has an element that omits key field \"name\" error 2023-01-08 21:47:00 +03:00
M. Mert Yildiran
eaa21f0789 🔨 Fix the targetted typo 2023-01-08 21:44:12 +03:00
M. Mert Yildiran
ac6aee07f5 Update README.md 2023-01-07 14:36:26 +03:00
M. Mert Yildiran
5b428c5636 Update README.md 2023-01-07 14:35:12 +03:00
M. Mert Yildiran
894f97ca41 Add an option to set the ImagePullSecrets 2023-01-07 14:20:01 +03:00
Atıl Sensalduz
45f8c8a834 docs: add how to install step for brew (#1286) 2023-01-03 15:14:38 -08:00
Alon Girmonsky
75ff80218b Update README.md 2022-12-31 14:14:19 -08:00
Alon Girmonsky
6255e1d4ef Update README.md 2022-12-31 14:10:32 -08:00
Alon Girmonsky
8ea606fb59 Update README.md 2022-12-31 10:15:05 -08:00
Alon Girmonsky
c9d41c53ca Update README.md 2022-12-30 21:45:13 -08:00
Alon Girmonsky
3995dae16d Update README.md 2022-12-30 21:41:56 -08:00
Alon Girmonsky
447fcf1b31 Change the center message
From Mizu to Distributed PCAP storage
2022-12-30 21:39:29 -08:00
M. Mert Yildiran
7e7fa772e1 🐛 Have a short caller (file:line) log 2022-12-30 08:30:48 +03:00
27 changed files with 348 additions and 820 deletions

View File

@@ -10,6 +10,9 @@ assignees: ''
**Describe the bug**
A clear and concise description of what the bug is.
**Provide more information**
Running on EKS, AKS, GKE, Minikube, Rancher, OpenShift? Number of Nodes? CNI?
**To Reproduce**
Steps to reproduce the behavior:
1. Run `kubeshark <command> ...`

View File

@@ -23,87 +23,61 @@
</a>
</p>
<p>
<p align="center">
Mizu (by UP9) is now Kubeshark, read more about it <a href="https://www.kubeshark.co/mizu-is-now-kubeshark">here</a>.
<b>
<a href="https://github.com/kubeshark/kubeshark/releases/latest">V38.2</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP</a> export, <a href="https://docs.kubeshark.co/en/dns">DNS</a>, <a href="https://docs.kubeshark.co/en/service_map">Identity-aware Service Map</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
</b>
</p>
Kubeshark, the API Traffic Viewer for kubernetes, provides deep visibility and monitoring of all API traffic and payloads going in, out and across containers and pods inside a Kubernetes cluster.
Think of a combination of Chrome Dev Tools, TCPDump and Wireshark, re-invented for Kubernetes.
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing real-time, protocol-aware visibility into Kubernetes internal network, capturing, dissecting and monitoring all traffic and payloads going in, out and across containers, pods, nodes and clusters.
![Simple UI](https://github.com/kubeshark/assets/raw/master/png/kubeshark-ui.png)
## Download
Think [TCPDump](https://en.wikipedia.org/wiki/Tcpdump) and [Wireshark](https://www.wireshark.org/) re-invented for Kubernetes
Kubeshark uses a ~45MB pre-compiled executable binary to communicate with the Kubernetes API. We recommend downloading the `kubeshark` CLI by using one of these options:
## Getting Started
- Choose the right binary, download and use directly from [the latest stable release](https://github.com/kubeshark/kubeshark/releases/latest).
- Use the shell script below :point_down: to automatically download the right binary for your operating system and CPU architecture:
```shell
sh <(curl -Ls https://kubeshark.co/install)
```
- Compile it from source using `make` command then use `./bin/kubeshark__` executable.
## Run
Use the `kubeshark` CLI to capture and view streaming API traffic in real time.
Download **Kubeshark**'s binary distribution [latest release](https://github.com/kubeshark/kubeshark/releases/latest) and run following one of these examples:
```shell
kubeshark tap
```
### Troubleshooting Installation
If something doesn't work or simply to play it safe prior to installing;
> Make sure you have access to https://hub.docker.com/
> Make sure `kubeshark` executable in your `PATH`.
### Select Pods
#### Monitoring a Specific Pod:
```shell
kubeshark tap catalogue-b87b45784-sxc8q
```
#### Monitoring a Set of Pods Using Regex:
```shell
kubeshark tap "(catalo*|front-end*)"
```
### Specify the Namespace
By default, Kubeshark targets the `default` namespace.
To specify a different namespace:
```
kubeshark tap -n sock-shop
```
### Specify All Namespaces
The default strategy of Kubeshark waits for the new pods
to be created. To simply tap all existing namespaces run:
```
kubeshark tap -A
```
```shell
kubeshark tap -n sock-shop "(catalo*|front-end*)"
```
Running any of the :point_up: above commands will open the [Web UI](https://docs.kubeshark.co/en/ui) in your browser which streams the traffic in your Kubernetes cluster in real-time.
### Homebrew
[Homebrew](https://brew.sh/) :beer: users can add Kubeshark formulae with:
```shell
brew tap kubeshark/kubeshark
```
and install Kubeshark CLI with:
```shell
brew install kubeshark
```
## Building From Source
Clone this repository and run `make` command to build it. After the build is complete, the executable can be found at `./bin/kubeshark__`.
## Documentation
Visit our documentation website: [docs.kubeshark.co](https://docs.kubeshark.co)
The documentation resources are open-source and can be found on GitHub: [kubeshark/docs](https://github.com/kubeshark/docs)
To learn more, read the [documentation](https://docs.kubeshark.co).
## Contributing
We ❤️ pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
We :heart: pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
## Code of Conduct

View File

@@ -1,123 +0,0 @@
package check
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
log.Info().Str("procedure", "image-pull-in-cluster").Msg("Checking:")
namespace := "default"
podName := fmt.Sprintf("%s-test", misc.Program)
defer func() {
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While removing test pod!")
}
}()
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While creating test pod!")
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Printf("%v cluster is not able to pull %s containers from docker hub, err: %v", misc.Program, fmt.Sprintf(utils.Red, "✗"), err)
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("Unable to pull images from Docker Hub!")
return false
}
log.Info().
Str("namespace", namespace).
Str("pod", podName).
Msg("Pulling images from Docker Hub is passed.")
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
image := docker.GetWorkerImage()
log.Info().Str("image", image).Msg("Testing image pull:")
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: image,
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -15,9 +15,6 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.SelfNamespace)
allResourcesExist := checkResourceExist(config.Config.SelfNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.SelfNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.SelfNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist

View File

@@ -33,9 +33,6 @@ func runCheck() {
checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
if checkPassed {
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
}

View File

@@ -10,5 +10,5 @@ func performCleanCommand() {
return
}
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, false)
}

View File

@@ -18,32 +18,32 @@ import (
"github.com/rs/zerolog/log"
)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, serviceName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName, cancel)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, serviceName string, podName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName)
if err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running K8s proxy. Try setting different port using --%s", proxyPortLabel))
return
}
connector := connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(healthCheck); err != nil {
log.Error().Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward..")
log.Warn().
Str("service", serviceName).
Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward...")
if err := httpServer.Shutdown(ctx); err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg("Error occurred while stopping proxy.")
}
podRegex, _ := regexp.Compile(kubernetes.HubPodName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx, cancel); err != nil {
podRegex, _ := regexp.Compile(podName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx); err != nil {
log.Error().
Str("pod-regex", podRegex.String()).
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port using --%s", proxyPortLabel))
return
}
@@ -53,7 +53,6 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
Str("service", serviceName).
Err(errormessage.FormatError(err)).
Msg("Couldn't connect to service.")
cancel()
return
}
}
@@ -97,11 +96,13 @@ func handleKubernetesProviderError(err error) {
}
}
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string) {
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, withoutCleanup bool) {
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
if !withoutCleanup {
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
}
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {

View File

@@ -26,7 +26,7 @@ func runProxy() {
exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.FrontServiceName)
if err != nil {
log.Error().
Str("service", misc.Program).
Str("service", kubernetes.FrontServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
@@ -42,36 +42,96 @@ func runProxy() {
return
}
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
exists, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.HubServiceName)
if err != nil {
log.Error().
Str("service", kubernetes.HubServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if !exists {
log.Error().
Str("service", kubernetes.HubServiceName).
Str("command", fmt.Sprintf("%s %s", misc.Program, tapCmd.Use)).
Msg("Service not found! You should run the command first:")
cancel()
return
}
var establishedProxy bool
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/", hubUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.HubServiceName).
Int("port", int(config.Config.Tap.Proxy.Hub.SrcPort)).
Msg("Found a running service.")
okToOpen("Hub", hubUrl, true)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
connector := connect.NewConnector(hubUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection("/echo"); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Hub."))
return
}
establishedProxy = true
okToOpen("Hub", hubUrl, true)
}
frontUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
response, err = http.Get(fmt.Sprintf("%s/", frontUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.FrontServiceName).
Int("port", int(config.Config.Tap.Proxy.Front.SrcPort)).
Msg("Found a running service.")
okToOpen(url)
return
}
log.Info().Msg("Establishing connection to K8s cluster...")
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
okToOpen("Kubeshark", frontUrl, false)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
connector := connect.NewConnector(frontUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
}
connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
establishedProxy = true
okToOpen("Kubeshark", frontUrl, false)
}
okToOpen(url)
utils.WaitForTermination(ctx, cancel)
if establishedProxy {
utils.WaitForTermination(ctx, cancel)
}
}
func okToOpen(url string) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
func okToOpen(name string, url string, noBrowser bool) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", name)))
if !config.Config.HeadlessMode {
if !config.Config.HeadlessMode && !noBrowser {
utils.OpenBrowser(url)
}
}

View File

@@ -46,6 +46,8 @@ func init() {
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images.")
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled.")
tapCmd.Flags().String(configStructs.DockerImagePullPolicy, defaultTapConfig.Docker.ImagePullPolicy, "ImagePullPolicy for the Docker images.")
tapCmd.Flags().StringSlice(configStructs.DockerImagePullSecrets, defaultTapConfig.Docker.ImagePullSecrets, "ImagePullSecrets for the Docker images.")
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")

View File

@@ -71,9 +71,9 @@ func tap() {
}
}
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
}
@@ -82,10 +82,12 @@ func tap() {
}
log.Info().Msg(fmt.Sprintf("Waiting for the creation of %s resources...", misc.Software))
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
log.Info().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance.", misc.Software, misc.Program, misc.Software))
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
log.Info().Msg("Updated the targeted pods. Exiting.")
} else {
defer resources.CleanUpSelfResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
log.Error().Err(errormessage.FormatError(err)).Msg("Error creating resources!")
@@ -102,10 +104,13 @@ func tap() {
// block until exit signal or error
utils.WaitForTermination(ctx, cancel)
log.Warn().
Str("command", fmt.Sprintf("%s proxy", misc.Program)).
Msg(fmt.Sprintf(utils.Yellow, "To re-establish a proxy/port-forward, run:"))
}
func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, true)
}
/*
@@ -113,69 +118,20 @@ This function is a bit problematic as it might be detached from the actual pods
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err
} else {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
for _, targettedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
}
return nil
}
}
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
TargetNamespaces: targetNamespaces,
PodFilterRegex: *config.Config.Tap.PodRegex(),
SelfNamespace: config.Config.SelfNamespace,
WorkerResources: config.Config.Tap.Resources.Worker,
ImagePullPolicy: config.Config.ImagePullPolicy(),
SelfServiceAccountExists: state.selfServiceAccountExists,
ServiceMesh: config.Config.Tap.ServiceMesh,
Tls: config.Config.Tap.Tls,
Debug: config.Config.Tap.Debug,
}, startTime)
if err != nil {
return err
}
go func() {
for {
select {
case syncerErr, ok := <-workerSyncer.ErrorOut:
if !ok {
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
return
}
log.Error().Msg(getK8sTapManagerErrorText(syncerErr))
cancel()
case _, ok := <-workerSyncer.TapPodChangesOut:
if !ok {
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return
}
go connector.PostWorkerPodToHub(pod)
case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return
}
}
}()
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
@@ -184,19 +140,6 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, %s will automatically target matching pods if any are created later%s", misc.Software, suggestionStr))
}
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerWorkerUpdateError:
return fmt.Sprintf("Error updating worker: %v", err.OriginalError)
default:
return fmt.Sprintf("Unknown error occured in K8s tap manager: %v", err.OriginalError)
}
}
func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
@@ -214,9 +157,9 @@ func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, c
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -294,9 +237,9 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -426,19 +369,50 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Hub.SrcPort, config.Config.Tap.Proxy.Hub.DstPort, "/echo")
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer")
cancel()
err := kubernetes.CreateWorkers(
kubernetesProvider,
state.selfServiceAccountExists,
ctx,
config.Config.SelfNamespace,
config.Config.Tap.Resources.Worker,
config.Config.ImagePullPolicy(),
config.Config.ImagePullSecrets(),
config.Config.Tap.ServiceMesh,
config.Config.Tap.Tls,
config.Config.Tap.Debug,
)
if err != nil {
log.Error().Err(err).Send()
}
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
}
func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyHubPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))

View File

@@ -45,6 +45,15 @@ func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy {
return v1.PullPolicy(config.Tap.Docker.ImagePullPolicy)
}
func (config *ConfigStruct) ImagePullSecrets() []v1.LocalObjectReference {
var ref []v1.LocalObjectReference
for _, name := range config.Tap.Docker.ImagePullSecrets {
ref = append(ref, v1.LocalObjectReference{Name: name})
}
return ref
}
func (config *ConfigStruct) IsNsRestrictedMode() bool {
return config.SelfNamespace != misc.Program // Notice "kubeshark" string must match the default SelfNamespace
}

View File

@@ -10,34 +10,36 @@ import (
)
const (
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
DockerImagePullPolicy = "docker-imagepullpolicy"
DockerImagePullSecrets = "docker-imagepullsecrets"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
)
type WorkerConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8897"`
DstPort uint16 `yaml:"dst-port" default:"8897"`
SrcPort uint16 `yaml:"port" default:"8897"`
DstPort uint16 `yaml:"srvport" default:"8897"`
}
type HubConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8898"`
DstPort uint16 `yaml:"dst-port" default:"8898"`
SrcPort uint16 `yaml:"port" default:"8898"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type FrontConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8899"`
DstPort uint16 `yaml:"dst-port" default:"80"`
SrcPort uint16 `yaml:"port" default:"8899"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type ProxyConfig struct {
@@ -48,9 +50,10 @@ type ProxyConfig struct {
}
type DockerConfig struct {
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
ImagePullSecrets []string `yaml:"imagepullsecrets"`
}
type ResourcesConfig struct {

View File

@@ -1,6 +1,9 @@
package docker
import "fmt"
import (
"fmt"
"strings"
)
const (
hub = "hub"
@@ -9,7 +12,7 @@ const (
)
var (
registry = "docker.io/kubeshark"
registry = "docker.io/kubeshark/"
tag = "latest"
)
@@ -18,7 +21,11 @@ func GetRegistry() string {
}
func SetRegistry(value string) {
registry = value
if strings.HasPrefix(value, "docker.io/kubeshark") {
registry = "docker.io/kubeshark/"
} else {
registry = value
}
}
func GetTag() string {
@@ -30,7 +37,7 @@ func SetTag(value string) {
}
func getImage(image string) string {
return fmt.Sprintf("%s/%s:%s", registry, image, tag)
return fmt.Sprintf("%s%s:%s", registry, image, tag)
}
func GetHubImage() string {

View File

@@ -18,7 +18,7 @@ func FormatError(err error) error {
if k8serrors.IsForbidden(err) {
errorNew = fmt.Errorf("insufficient permissions: %w. "+
"supply the required permission or control %s's access to namespaces by setting %s "+
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
"in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
err,
misc.Software,
config.SelfNamespaceConfigName,

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/google/go-github/v37 v37.0.0
github.com/kubeshark/base v0.5.0
github.com/kubeshark/base v0.5.1
github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5

4
go.sum
View File

@@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0=
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA=
github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=

View File

@@ -12,7 +12,6 @@ import (
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
@@ -64,7 +63,6 @@ func (connector *Connector) isReachable(path string) (bool, error) {
}
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
// TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
if podMarshalled, err := json.Marshal(pod); err != nil {
@@ -116,22 +114,32 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
}
}
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
type postRegexRequest struct {
Regex string `json:"regex"`
Namespaces []string `json:"namespaces"`
}
if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
func (connector *Connector) PostRegexToHub(regex string, namespaces []string) {
postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url)
payload := postRegexRequest{
Regex: regex,
Namespaces: namespaces,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the payload:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
if _, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
log.Debug().Err(err).Msg("Failed sending the payload to Hub:")
} else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported payload to Hub:")
}
time.Sleep(time.Second)
}

View File

@@ -14,7 +14,6 @@ const (
ServiceAccountName = SelfResourcesPrefix + "service-account"
WorkerDaemonSetName = SelfResourcesPrefix + "worker-daemon-set"
WorkerPodName = SelfResourcesPrefix + "worker"
ConfigMapName = SelfResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)

View File

@@ -177,13 +177,11 @@ type PodOptions struct {
ServiceAccountName string
Resources Resources
ImagePullPolicy core.PullPolicy
ImagePullSecrets []core.LocalObjectReference
Debug bool
}
func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -251,6 +249,7 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -262,9 +261,6 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
}
func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPort string) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -315,7 +311,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Env: []core.EnvVar{
{
Name: "REACT_APP_DEFAULT_FILTER",
Value: "timestamp >= now()",
Value: " ",
},
{
Name: "REACT_APP_HUB_HOST",
@@ -353,6 +349,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -416,11 +413,6 @@ func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (
return provider.doesResourceExist(namespaceResource, err)
}
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
configMapResource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(configMapResource, err)
}
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, name string) (bool, error) {
serviceAccountResource, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(serviceAccountResource, err)
@@ -660,26 +652,21 @@ func (provider *Provider) ApplyWorkerDaemonSet(
daemonSetName string,
podImage string,
workerPodName string,
nodeNames []string,
serviceAccountName string,
resources Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod", workerPodName).
Msg("Applying worker DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
}
command := []string{"./worker", "-i", "any", "-port", "8897"}
if debug {
@@ -759,22 +746,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits)
workerContainer.WithResources(workerResources)
matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0)
for _, nodeName := range nodeNames {
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("metadata.name")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeName)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
matchFields = append(matchFields, nodeSelectorTerm)
}
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(matchFields...)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
@@ -812,6 +784,14 @@ func (provider *Provider) ApplyWorkerDaemonSet(
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podSpec.WithVolumes(procfsVolume, sysfsVolume)
if len(imagePullSecrets) > 0 {
localObjectReference := applyconfcore.LocalObjectReference()
for _, secret := range imagePullSecrets {
localObjectReference.WithName(secret.Name)
}
podSpec.WithImagePullSecrets(localObjectReference)
}
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": workerPodName,

View File

@@ -21,7 +21,7 @@ import (
const k8sProxyApiPrefix = "/"
const selfServicePort = 80
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string, cancel context.CancelFunc) (*http.Server, error) {
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string) (*http.Server, error) {
log.Info().
Str("namespace", selfNamespace).
Str("service", selfServiceName).
@@ -55,7 +55,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16,
go func() {
if err := server.Serve(l); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("While creating proxy!")
cancel()
return
}
}()
@@ -99,7 +99,7 @@ func getRerouteHttpHandlerSelfStatic(proxyHandler http.Handler, selfNamespace st
})
}
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) {
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context) (*portforward.PortForwarder, error) {
pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace})
if err != nil {
return nil, err
@@ -132,7 +132,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
go func() {
if err = forwarder.ForwardPorts(); err != nil {
log.Error().Err(err).Msg("While Kubernetes port-forwarding!")
cancel()
return
}
}()

View File

@@ -1,26 +1,24 @@
package kubernetes
import (
"regexp"
"github.com/kubeshark/base/pkg/models"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targettedPods {
func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
nodeToTargetedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targetedPods {
minimizedPod := getMinimizedPod(pod)
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else {
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
}
}
return nodeToTargettedPodsMap
return nodeToTargetedPodsMap
}
func getMinimizedPod(fullPod core.Pod) core.Pod {
@@ -48,44 +46,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
return result
}
func excludeSelfPods(pods []core.Pod) []core.Pod {
selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix)
nonSelfPods := make([]core.Pod, 0)
for _, pod := range pods {
if !selfPrefixRegex.MatchString(pod.Name) {
nonSelfPods = append(nonSelfPods, pod)
}
}
return nonSelfPods
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
added = getMissingPods(newPods, oldPods)
removed = getMissingPods(oldPods, newPods)
return added, removed
}
//returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
missingPods := make([]core.Pod, 0)
for _, pod1 := range pods1 {
var found = false
for _, pod2 := range pods2 {
if pod1.UID == pod2.UID {
found = true
break
}
}
if !found {
missingPods = append(missingPods, pod1)
}
}
return missingPods
}
func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
podInfos := make([]*models.PodInfo, 0)
for _, pod := range pods {

View File

@@ -1,389 +0,0 @@
package kubernetes
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/debounce"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
const updateWorkersDelay = 5 * time.Second
type TargettedPodChangeEvent struct {
Added []v1.Pod
Removed []v1.Pod
}
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTargettedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargettedPodChangeEvent
WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError
nodeToTargettedPodMap models.NodeToPodsMap
targettedNodes []string
}
type WorkerSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
SelfNamespace string
WorkerResources Resources
ImagePullPolicy v1.PullPolicy
SelfServiceAccountExists bool
ServiceMesh bool
Tls bool
Debug bool
}
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargettedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
return nil, err
}
if err := syncer.updateWorkers(); err != nil {
return nil, err
}
go syncer.watchPodsForTargetting()
go syncer.watchWorkerEvents()
go syncer.watchWorkerPods()
return syncer, nil
}
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software))
continue
}
log.Debug().
Str("pod", pod.Name).
Str("node", pod.Spec.NodeName).
Interface("phase", pod.Status.Phase).
Msg("Watching pod events...")
if pod.Spec.NodeName != "" {
workerSyncer.WorkerPodsChanges <- pod
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("Parsing resource event.")
continue
}
log.Debug().
Str("pod", WorkerPodName).
Str("event", event.Name).
Time("time", event.CreationTimestamp.Time).
Str("name", event.Regarding.Name).
Str("kind", event.Regarding.Kind).
Str("reason", event.Reason).
Str("note", event.Note).
Msg("Watching events.")
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name)
if err1 != nil {
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
continue
}
workerSyncer.WorkerPodsChanges <- pod
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("While watching events.")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod events, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
}
}
if !changeFound {
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
return
}
if err := workerSyncer.updateWorkers(); err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerWorkerUpdateError,
}
}
}
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
}
switch wEvent.Type {
case EventAdded:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventDeleted:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventModified:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Str("ip", pod.Status.PodIP).
Interface("phase", pod.Status.Phase).
Msg("Modified matching pod.")
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
case <-workerSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
}
for _, removedPod := range removedPods {
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargettedPods = podsToTarget
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
return nil, true
}
return nil, false
}
}
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
i := 0
for node := range workerSyncer.nodeToTargettedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
for nodeName := range workerSyncer.nodeToTargettedPodMap {
nodeNames = append(nodeNames, nodeName)
}
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName,
nodeNames,
serviceAccountName,
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.Debug); err != nil {
return err
}
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
} else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName); err != nil {
return err
}
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
workerSyncer.targettedNodes = nodesToTarget
return nil
}

54
kubernetes/workers.go Normal file
View File

@@ -0,0 +1,54 @@
package kubernetes
import (
"context"
"github.com/kubeshark/kubeshark/docker"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
)
func CreateWorkers(
kubernetesProvider *Provider,
selfServiceAccountExists bool,
ctx context.Context,
namespace string,
resources Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
image := docker.GetWorkerImage()
var serviceAccountName string
if selfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
log.Info().Msg("Creating the worker DaemonSet...")
if err := kubernetesProvider.ApplyWorkerDaemonSet(
ctx,
namespace,
WorkerDaemonSetName,
image,
WorkerPodName,
serviceAccountName,
resources,
imagePullPolicy,
imagePullSecrets,
serviceMesh,
tls,
debug,
); err != nil {
return err
}
log.Info().Msg("Successfully created the worker DaemonSet.")
return nil
}

View File

@@ -2,6 +2,7 @@ package main
import (
"os"
"strconv"
"time"
"github.com/kubeshark/kubeshark/cmd"
@@ -11,6 +12,20 @@ import (
func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
// Short caller (file:line)
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
short := file
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
short = file[i+1:]
break
}
}
file = short
return file + ":" + strconv.Itoa(line)
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
cmd.Execute()
}

View File

@@ -113,11 +113,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveConfigMap(ctx, selfResourcesNamespace, kubernetes.ConfigMapName); err != nil {
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if resources, err := kubernetesProvider.ListManagedServiceAccounts(ctx, selfResourcesNamespace); err != nil {
resourceDesc := fmt.Sprintf("ServiceAccounts in namespace %s", selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)

View File

@@ -13,10 +13,10 @@ import (
core "k8s.io/api/core/v1"
)
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, imagePullSecrets []core.LocalObjectReference, debug bool) (bool, error) {
if !isNsRestrictedMode {
if err := createSelfNamespace(ctx, kubernetesProvider, selfNamespace); err != nil {
return false, err
log.Debug().Err(err).Send()
}
}
@@ -39,6 +39,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}
@@ -49,6 +50,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}

View File

@@ -5,8 +5,8 @@ const (
Red = "\033[1;31m%s\033[0m"
Green = "\033[1;32m%s\033[0m"
Yellow = "\033[1;33m%s\033[0m"
Purple = "\033[1;34m%s\033[0m"
Blue = "\033[1;34m%s\033[0m"
Magenta = "\033[1;35m%s\033[0m"
Teal = "\033[1;36m%s\033[0m"
Cyan = "\033[1;36m%s\033[0m"
White = "\033[1;37m%s\033[0m"
)