mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-06 19:51:00 +00:00
Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9fdde4110 | ||
|
|
4eeab41e6d | ||
|
|
dce7de6cc9 | ||
|
|
c809117b2c | ||
|
|
2c6c71cf49 | ||
|
|
1533d1ec28 | ||
|
|
edf5c8cd6d | ||
|
|
6c69fb6bc4 | ||
|
|
4dd941f8d9 | ||
|
|
90294e32c1 | ||
|
|
5f4a856c5e | ||
|
|
4e3233ade8 | ||
|
|
846f253a03 | ||
|
|
f128ae3993 | ||
|
|
38da25ecc8 | ||
|
|
bf777f9fca | ||
|
|
3c4272c6d1 | ||
|
|
920535b643 | ||
|
|
b55dd5b072 | ||
|
|
f5cf15d657 | ||
|
|
fe623438c4 | ||
|
|
1c35a9f143 | ||
|
|
d6766a8ac5 | ||
|
|
eaa21f0789 | ||
|
|
ac6aee07f5 | ||
|
|
5b428c5636 | ||
|
|
894f97ca41 | ||
|
|
45f8c8a834 | ||
|
|
75ff80218b | ||
|
|
6255e1d4ef | ||
|
|
8ea606fb59 | ||
|
|
c9d41c53ca | ||
|
|
3995dae16d | ||
|
|
447fcf1b31 | ||
|
|
7e7fa772e1 |
3
.github/ISSUE_TEMPLATE/bug_report.md
vendored
3
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -10,6 +10,9 @@ assignees: ''
|
||||
**Describe the bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
||||
**Provide more information**
|
||||
Running on EKS, AKS, GKE, Minikube, Rancher, OpenShift? Number of Nodes? CNI?
|
||||
|
||||
**To Reproduce**
|
||||
Steps to reproduce the behavior:
|
||||
1. Run `kubeshark <command> ...`
|
||||
|
||||
92
README.md
92
README.md
@@ -23,87 +23,61 @@
|
||||
</a>
|
||||
</p>
|
||||
|
||||
<p>
|
||||
<p align="center">
|
||||
Mizu (by UP9) is now Kubeshark, read more about it <a href="https://www.kubeshark.co/mizu-is-now-kubeshark">here</a>.
|
||||
<b>
|
||||
<a href="https://github.com/kubeshark/kubeshark/releases/latest">V38.2</a> is out with <a href="https://docs.kubeshark.co/en/pcap_export_import">PCAP</a> export, <a href="https://docs.kubeshark.co/en/dns">DNS</a>, <a href="https://docs.kubeshark.co/en/service_map">Identity-aware Service Map</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
|
||||
</b>
|
||||
</p>
|
||||
|
||||
Kubeshark, the API Traffic Viewer for kubernetes, provides deep visibility and monitoring of all API traffic and payloads going in, out and across containers and pods inside a Kubernetes cluster.
|
||||
|
||||
Think of a combination of Chrome Dev Tools, TCPDump and Wireshark, re-invented for Kubernetes.
|
||||
**Kubeshark** is an API Traffic Viewer for [**Kubernetes**](https://kubernetes.io/) providing real-time, protocol-aware visibility into Kubernetes’ internal network, capturing, dissecting and monitoring all traffic and payloads going in, out and across containers, pods, nodes and clusters.
|
||||
|
||||

|
||||
|
||||
## Download
|
||||
Think [TCPDump](https://en.wikipedia.org/wiki/Tcpdump) and [Wireshark](https://www.wireshark.org/) re-invented for Kubernetes
|
||||
|
||||
Kubeshark uses a ~45MB pre-compiled executable binary to communicate with the Kubernetes API. We recommend downloading the `kubeshark` CLI by using one of these options:
|
||||
## Getting Started
|
||||
|
||||
- Choose the right binary, download and use directly from [the latest stable release](https://github.com/kubeshark/kubeshark/releases/latest).
|
||||
|
||||
- Use the shell script below :point_down: to automatically download the right binary for your operating system and CPU architecture:
|
||||
|
||||
```shell
|
||||
sh <(curl -Ls https://kubeshark.co/install)
|
||||
```
|
||||
|
||||
- Compile it from source using `make` command then use `./bin/kubeshark__` executable.
|
||||
|
||||
## Run
|
||||
|
||||
Use the `kubeshark` CLI to capture and view streaming API traffic in real time.
|
||||
Download **Kubeshark**'s binary distribution [latest release](https://github.com/kubeshark/kubeshark/releases/latest) and run following one of these examples:
|
||||
|
||||
```shell
|
||||
kubeshark tap
|
||||
```
|
||||
|
||||
### Troubleshooting Installation
|
||||
If something doesn't work or simply to play it safe prior to installing;
|
||||
|
||||
> Make sure you have access to https://hub.docker.com/
|
||||
|
||||
> Make sure `kubeshark` executable in your `PATH`.
|
||||
|
||||
### Select Pods
|
||||
|
||||
#### Monitoring a Specific Pod:
|
||||
|
||||
```shell
|
||||
kubeshark tap catalogue-b87b45784-sxc8q
|
||||
```
|
||||
|
||||
#### Monitoring a Set of Pods Using Regex:
|
||||
|
||||
```shell
|
||||
kubeshark tap "(catalo*|front-end*)"
|
||||
```
|
||||
|
||||
### Specify the Namespace
|
||||
|
||||
By default, Kubeshark targets the `default` namespace.
|
||||
To specify a different namespace:
|
||||
|
||||
```
|
||||
kubeshark tap -n sock-shop
|
||||
```
|
||||
|
||||
### Specify All Namespaces
|
||||
|
||||
The default strategy of Kubeshark waits for the new pods
|
||||
to be created. To simply tap all existing namespaces run:
|
||||
|
||||
```
|
||||
kubeshark tap -A
|
||||
```
|
||||
|
||||
```shell
|
||||
kubeshark tap -n sock-shop "(catalo*|front-end*)"
|
||||
```
|
||||
|
||||
Running any of the :point_up: above commands will open the [Web UI](https://docs.kubeshark.co/en/ui) in your browser which streams the traffic in your Kubernetes cluster in real-time.
|
||||
|
||||
### Homebrew
|
||||
|
||||
[Homebrew](https://brew.sh/) :beer: users can add Kubeshark formulae with:
|
||||
|
||||
```shell
|
||||
brew tap kubeshark/kubeshark
|
||||
```
|
||||
|
||||
and install Kubeshark CLI with:
|
||||
|
||||
```shell
|
||||
brew install kubeshark
|
||||
```
|
||||
|
||||
## Building From Source
|
||||
|
||||
Clone this repository and run `make` command to build it. After the build is complete, the executable can be found at `./bin/kubeshark__`.
|
||||
|
||||
## Documentation
|
||||
|
||||
Visit our documentation website: [docs.kubeshark.co](https://docs.kubeshark.co)
|
||||
|
||||
The documentation resources are open-source and can be found on GitHub: [kubeshark/docs](https://github.com/kubeshark/docs)
|
||||
To learn more, read the [documentation](https://docs.kubeshark.co).
|
||||
|
||||
## Contributing
|
||||
|
||||
We ❤️ pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
|
||||
We :heart: pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
|
||||
|
||||
## Code of Conduct
|
||||
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
package check
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/docker"
|
||||
"github.com/kubeshark/kubeshark/kubernetes"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
"github.com/rs/zerolog/log"
|
||||
core "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
|
||||
log.Info().Str("procedure", "image-pull-in-cluster").Msg("Checking:")
|
||||
|
||||
namespace := "default"
|
||||
podName := fmt.Sprintf("%s-test", misc.Program)
|
||||
|
||||
defer func() {
|
||||
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
|
||||
log.Error().
|
||||
Str("namespace", namespace).
|
||||
Str("pod", podName).
|
||||
Err(err).
|
||||
Msg("While removing test pod!")
|
||||
}
|
||||
}()
|
||||
|
||||
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
|
||||
log.Error().
|
||||
Str("namespace", namespace).
|
||||
Str("pod", podName).
|
||||
Err(err).
|
||||
Msg("While creating test pod!")
|
||||
return false
|
||||
}
|
||||
|
||||
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
|
||||
log.Printf("%v cluster is not able to pull %s containers from docker hub, err: %v", misc.Program, fmt.Sprintf(utils.Red, "✗"), err)
|
||||
log.Error().
|
||||
Str("namespace", namespace).
|
||||
Str("pod", podName).
|
||||
Err(err).
|
||||
Msg("Unable to pull images from Docker Hub!")
|
||||
return false
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("namespace", namespace).
|
||||
Str("pod", podName).
|
||||
Msg("Pulling images from Docker Hub is passed.")
|
||||
return true
|
||||
}
|
||||
|
||||
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
|
||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
|
||||
|
||||
timeAfter := time.After(30 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if pod.Status.Phase == core.PodRunning {
|
||||
return nil
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
case <-timeAfter:
|
||||
return fmt.Errorf("image not pulled in time")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
|
||||
image := docker.GetWorkerImage()
|
||||
log.Info().Str("image", image).Msg("Testing image pull:")
|
||||
var zero int64
|
||||
pod := &core.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
},
|
||||
Spec: core.PodSpec{
|
||||
Containers: []core.Container{
|
||||
{
|
||||
Name: "probe",
|
||||
Image: image,
|
||||
ImagePullPolicy: "Always",
|
||||
Command: []string{"cat"},
|
||||
Stdin: true,
|
||||
},
|
||||
},
|
||||
TerminationGracePeriodSeconds: &zero,
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -15,9 +15,6 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.SelfNamespace)
|
||||
allResourcesExist := checkResourceExist(config.Config.SelfNamespace, "namespace", exist, err)
|
||||
|
||||
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.SelfNamespace, kubernetes.ConfigMapName)
|
||||
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
|
||||
|
||||
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.SelfNamespace, kubernetes.ServiceAccountName)
|
||||
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist
|
||||
|
||||
|
||||
@@ -33,9 +33,6 @@ func runCheck() {
|
||||
checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider)
|
||||
}
|
||||
|
||||
if checkPassed {
|
||||
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
|
||||
}
|
||||
if checkPassed {
|
||||
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
|
||||
}
|
||||
|
||||
@@ -10,5 +10,5 @@ func performCleanCommand() {
|
||||
return
|
||||
}
|
||||
|
||||
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
|
||||
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, false)
|
||||
}
|
||||
|
||||
@@ -18,32 +18,32 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, serviceName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName, cancel)
|
||||
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, serviceName string, podName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
|
||||
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(errormessage.FormatError(err)).
|
||||
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", proxyPortLabel))
|
||||
cancel()
|
||||
Msg(fmt.Sprintf("Error occured while running K8s proxy. Try setting different port using --%s", proxyPortLabel))
|
||||
return
|
||||
}
|
||||
|
||||
connector := connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(healthCheck); err != nil {
|
||||
log.Error().Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward..")
|
||||
log.Warn().
|
||||
Str("service", serviceName).
|
||||
Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward...")
|
||||
if err := httpServer.Shutdown(ctx); err != nil {
|
||||
log.Error().
|
||||
Err(errormessage.FormatError(err)).
|
||||
Msg("Error occurred while stopping proxy.")
|
||||
}
|
||||
|
||||
podRegex, _ := regexp.Compile(kubernetes.HubPodName)
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx, cancel); err != nil {
|
||||
podRegex, _ := regexp.Compile(podName)
|
||||
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx); err != nil {
|
||||
log.Error().
|
||||
Str("pod-regex", podRegex.String()).
|
||||
Err(errormessage.FormatError(err)).
|
||||
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", proxyPortLabel))
|
||||
cancel()
|
||||
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port using --%s", proxyPortLabel))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -53,7 +53,6 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
|
||||
Str("service", serviceName).
|
||||
Err(errormessage.FormatError(err)).
|
||||
Msg("Couldn't connect to service.")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -97,11 +96,13 @@ func handleKubernetesProviderError(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string) {
|
||||
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, withoutCleanup bool) {
|
||||
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
|
||||
defer cancel()
|
||||
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
|
||||
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
|
||||
if !withoutCleanup {
|
||||
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {
|
||||
|
||||
@@ -26,7 +26,7 @@ func runProxy() {
|
||||
exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.FrontServiceName)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Str("service", misc.Program).
|
||||
Str("service", kubernetes.FrontServiceName).
|
||||
Err(err).
|
||||
Msg("Failed to found service!")
|
||||
cancel()
|
||||
@@ -42,36 +42,96 @@ func runProxy() {
|
||||
return
|
||||
}
|
||||
|
||||
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
|
||||
exists, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.HubServiceName)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Str("service", kubernetes.HubServiceName).
|
||||
Err(err).
|
||||
Msg("Failed to found service!")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
response, err := http.Get(fmt.Sprintf("%s/", url))
|
||||
if !exists {
|
||||
log.Error().
|
||||
Str("service", kubernetes.HubServiceName).
|
||||
Str("command", fmt.Sprintf("%s %s", misc.Program, tapCmd.Use)).
|
||||
Msg("Service not found! You should run the command first:")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
var establishedProxy bool
|
||||
|
||||
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
|
||||
response, err := http.Get(fmt.Sprintf("%s/", hubUrl))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
log.Info().
|
||||
Str("service", kubernetes.HubServiceName).
|
||||
Int("port", int(config.Config.Tap.Proxy.Hub.SrcPort)).
|
||||
Msg("Found a running service.")
|
||||
|
||||
okToOpen("Hub", hubUrl, true)
|
||||
} else {
|
||||
startProxyReportErrorIfAny(
|
||||
kubernetesProvider,
|
||||
ctx,
|
||||
kubernetes.HubServiceName,
|
||||
kubernetes.HubPodName,
|
||||
configStructs.ProxyHubPortLabel,
|
||||
config.Config.Tap.Proxy.Hub.SrcPort,
|
||||
config.Config.Tap.Proxy.Hub.DstPort,
|
||||
"/echo",
|
||||
)
|
||||
connector := connect.NewConnector(hubUrl, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection("/echo"); err != nil {
|
||||
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Hub."))
|
||||
return
|
||||
}
|
||||
|
||||
establishedProxy = true
|
||||
okToOpen("Hub", hubUrl, true)
|
||||
}
|
||||
|
||||
frontUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
|
||||
response, err = http.Get(fmt.Sprintf("%s/", frontUrl))
|
||||
if err == nil && response.StatusCode == 200 {
|
||||
log.Info().
|
||||
Str("service", kubernetes.FrontServiceName).
|
||||
Int("port", int(config.Config.Tap.Proxy.Front.SrcPort)).
|
||||
Msg("Found a running service.")
|
||||
|
||||
okToOpen(url)
|
||||
return
|
||||
}
|
||||
log.Info().Msg("Establishing connection to K8s cluster...")
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
|
||||
okToOpen("Kubeshark", frontUrl, false)
|
||||
} else {
|
||||
startProxyReportErrorIfAny(
|
||||
kubernetesProvider,
|
||||
ctx,
|
||||
kubernetes.FrontServiceName,
|
||||
kubernetes.FrontPodName,
|
||||
configStructs.ProxyFrontPortLabel,
|
||||
config.Config.Tap.Proxy.Front.SrcPort,
|
||||
config.Config.Tap.Proxy.Front.DstPort,
|
||||
"",
|
||||
)
|
||||
connector := connect.NewConnector(frontUrl, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err != nil {
|
||||
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
|
||||
return
|
||||
}
|
||||
|
||||
connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout)
|
||||
if err := connector.TestConnection(""); err != nil {
|
||||
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
|
||||
return
|
||||
establishedProxy = true
|
||||
okToOpen("Kubeshark", frontUrl, false)
|
||||
}
|
||||
|
||||
okToOpen(url)
|
||||
|
||||
utils.WaitForTermination(ctx, cancel)
|
||||
if establishedProxy {
|
||||
utils.WaitForTermination(ctx, cancel)
|
||||
}
|
||||
}
|
||||
|
||||
func okToOpen(url string) {
|
||||
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
|
||||
func okToOpen(name string, url string, noBrowser bool) {
|
||||
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", name)))
|
||||
|
||||
if !config.Config.HeadlessMode {
|
||||
if !config.Config.HeadlessMode && !noBrowser {
|
||||
utils.OpenBrowser(url)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@ func init() {
|
||||
|
||||
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images.")
|
||||
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled.")
|
||||
tapCmd.Flags().String(configStructs.DockerImagePullPolicy, defaultTapConfig.Docker.ImagePullPolicy, "ImagePullPolicy for the Docker images.")
|
||||
tapCmd.Flags().StringSlice(configStructs.DockerImagePullSecrets, defaultTapConfig.Docker.ImagePullSecrets, "ImagePullSecrets for the Docker images.")
|
||||
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
|
||||
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
|
||||
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")
|
||||
|
||||
132
cmd/tapRunner.go
132
cmd/tapRunner.go
@@ -71,9 +71,9 @@ func tap() {
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
|
||||
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
|
||||
|
||||
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
|
||||
if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
|
||||
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
|
||||
}
|
||||
|
||||
@@ -82,10 +82,12 @@ func tap() {
|
||||
}
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Waiting for the creation of %s resources...", misc.Software))
|
||||
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
|
||||
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
|
||||
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
|
||||
log.Info().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance.", misc.Software, misc.Program, misc.Software))
|
||||
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
|
||||
log.Info().Msg("Updated the targeted pods. Exiting.")
|
||||
} else {
|
||||
defer resources.CleanUpSelfResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
|
||||
log.Error().Err(errormessage.FormatError(err)).Msg("Error creating resources!")
|
||||
@@ -102,10 +104,13 @@ func tap() {
|
||||
|
||||
// block until exit signal or error
|
||||
utils.WaitForTermination(ctx, cancel)
|
||||
log.Warn().
|
||||
Str("command", fmt.Sprintf("%s proxy", misc.Program)).
|
||||
Msg(fmt.Sprintf(utils.Yellow, "To re-establish a proxy/port-forward, run:"))
|
||||
}
|
||||
|
||||
func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
|
||||
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
|
||||
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, true)
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -113,69 +118,20 @@ This function is a bit problematic as it might be detached from the actual pods
|
||||
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
|
||||
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
|
||||
*/
|
||||
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
||||
func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
||||
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
|
||||
return err
|
||||
} else {
|
||||
if len(matchingPods) == 0 {
|
||||
printNoPodsFoundSuggestion(namespaces)
|
||||
}
|
||||
for _, targettedPod := range matchingPods {
|
||||
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
|
||||
for _, targetedPod := range matchingPods {
|
||||
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
|
||||
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
|
||||
TargetNamespaces: targetNamespaces,
|
||||
PodFilterRegex: *config.Config.Tap.PodRegex(),
|
||||
SelfNamespace: config.Config.SelfNamespace,
|
||||
WorkerResources: config.Config.Tap.Resources.Worker,
|
||||
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
||||
SelfServiceAccountExists: state.selfServiceAccountExists,
|
||||
ServiceMesh: config.Config.Tap.ServiceMesh,
|
||||
Tls: config.Config.Tap.Tls,
|
||||
Debug: config.Config.Tap.Debug,
|
||||
}, startTime)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case syncerErr, ok := <-workerSyncer.ErrorOut:
|
||||
if !ok {
|
||||
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
|
||||
return
|
||||
}
|
||||
log.Error().Msg(getK8sTapManagerErrorText(syncerErr))
|
||||
cancel()
|
||||
case _, ok := <-workerSyncer.TapPodChangesOut:
|
||||
if !ok {
|
||||
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
|
||||
return
|
||||
}
|
||||
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
|
||||
case pod, ok := <-workerSyncer.WorkerPodsChanges:
|
||||
if !ok {
|
||||
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
|
||||
return
|
||||
}
|
||||
go connector.PostWorkerPodToHub(pod)
|
||||
case <-ctx.Done():
|
||||
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func printNoPodsFoundSuggestion(targetNamespaces []string) {
|
||||
var suggestionStr string
|
||||
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
|
||||
@@ -184,19 +140,6 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
|
||||
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, %s will automatically target matching pods if any are created later%s", misc.Software, suggestionStr))
|
||||
}
|
||||
|
||||
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
|
||||
switch err.TapManagerReason {
|
||||
case kubernetes.TapManagerPodListError:
|
||||
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
|
||||
case kubernetes.TapManagerPodWatchError:
|
||||
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
|
||||
case kubernetes.TapManagerWorkerUpdateError:
|
||||
return fmt.Sprintf("Error updating worker: %v", err.OriginalError)
|
||||
default:
|
||||
return fmt.Sprintf("Unknown error occured in K8s tap manager: %v", err.OriginalError)
|
||||
}
|
||||
}
|
||||
|
||||
func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName))
|
||||
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
||||
@@ -214,9 +157,9 @@ func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, c
|
||||
|
||||
switch wEvent.Type {
|
||||
case kubernetes.EventAdded:
|
||||
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added pod.")
|
||||
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added:")
|
||||
case kubernetes.EventDeleted:
|
||||
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed pod.")
|
||||
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed:")
|
||||
cancel()
|
||||
return
|
||||
case kubernetes.EventModified:
|
||||
@@ -294,9 +237,9 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
|
||||
|
||||
switch wEvent.Type {
|
||||
case kubernetes.EventAdded:
|
||||
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added pod.")
|
||||
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added:")
|
||||
case kubernetes.EventDeleted:
|
||||
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed pod.")
|
||||
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed:")
|
||||
cancel()
|
||||
return
|
||||
case kubernetes.EventModified:
|
||||
@@ -426,19 +369,50 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider
|
||||
}
|
||||
|
||||
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Hub.SrcPort, config.Config.Tap.Proxy.Hub.DstPort, "/echo")
|
||||
startProxyReportErrorIfAny(
|
||||
kubernetesProvider,
|
||||
ctx,
|
||||
kubernetes.HubServiceName,
|
||||
kubernetes.HubPodName,
|
||||
configStructs.ProxyHubPortLabel,
|
||||
config.Config.Tap.Proxy.Hub.SrcPort,
|
||||
config.Config.Tap.Proxy.Hub.DstPort,
|
||||
"/echo",
|
||||
)
|
||||
|
||||
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
|
||||
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer")
|
||||
cancel()
|
||||
err := kubernetes.CreateWorkers(
|
||||
kubernetesProvider,
|
||||
state.selfServiceAccountExists,
|
||||
ctx,
|
||||
config.Config.SelfNamespace,
|
||||
config.Config.Tap.Resources.Worker,
|
||||
config.Config.ImagePullPolicy(),
|
||||
config.Config.ImagePullSecrets(),
|
||||
config.Config.Tap.ServiceMesh,
|
||||
config.Config.Tap.Tls,
|
||||
config.Config.Tap.Debug,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Send()
|
||||
}
|
||||
|
||||
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
|
||||
|
||||
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
|
||||
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
|
||||
}
|
||||
|
||||
func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyHubPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
|
||||
startProxyReportErrorIfAny(
|
||||
kubernetesProvider,
|
||||
ctx,
|
||||
kubernetes.FrontServiceName,
|
||||
kubernetes.FrontPodName,
|
||||
configStructs.ProxyFrontPortLabel,
|
||||
config.Config.Tap.Proxy.Front.SrcPort,
|
||||
config.Config.Tap.Proxy.Front.DstPort,
|
||||
"",
|
||||
)
|
||||
|
||||
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
|
||||
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
|
||||
|
||||
@@ -45,6 +45,15 @@ func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy {
|
||||
return v1.PullPolicy(config.Tap.Docker.ImagePullPolicy)
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) ImagePullSecrets() []v1.LocalObjectReference {
|
||||
var ref []v1.LocalObjectReference
|
||||
for _, name := range config.Tap.Docker.ImagePullSecrets {
|
||||
ref = append(ref, v1.LocalObjectReference{Name: name})
|
||||
}
|
||||
|
||||
return ref
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) IsNsRestrictedMode() bool {
|
||||
return config.SelfNamespace != misc.Program // Notice "kubeshark" string must match the default SelfNamespace
|
||||
}
|
||||
|
||||
@@ -10,34 +10,36 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DockerRegistryLabel = "docker-registry"
|
||||
DockerTagLabel = "docker-tag"
|
||||
ProxyFrontPortLabel = "proxy-front-port"
|
||||
ProxyHubPortLabel = "proxy-hub-port"
|
||||
ProxyHostLabel = "proxy-host"
|
||||
NamespacesLabel = "namespaces"
|
||||
AllNamespacesLabel = "allnamespaces"
|
||||
StorageLimitLabel = "storagelimit"
|
||||
DryRunLabel = "dryrun"
|
||||
PcapLabel = "pcap"
|
||||
ServiceMeshLabel = "servicemesh"
|
||||
TlsLabel = "tls"
|
||||
DebugLabel = "debug"
|
||||
DockerRegistryLabel = "docker-registry"
|
||||
DockerTagLabel = "docker-tag"
|
||||
DockerImagePullPolicy = "docker-imagepullpolicy"
|
||||
DockerImagePullSecrets = "docker-imagepullsecrets"
|
||||
ProxyFrontPortLabel = "proxy-front-port"
|
||||
ProxyHubPortLabel = "proxy-hub-port"
|
||||
ProxyHostLabel = "proxy-host"
|
||||
NamespacesLabel = "namespaces"
|
||||
AllNamespacesLabel = "allnamespaces"
|
||||
StorageLimitLabel = "storagelimit"
|
||||
DryRunLabel = "dryrun"
|
||||
PcapLabel = "pcap"
|
||||
ServiceMeshLabel = "servicemesh"
|
||||
TlsLabel = "tls"
|
||||
DebugLabel = "debug"
|
||||
)
|
||||
|
||||
type WorkerConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8897"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"8897"`
|
||||
SrcPort uint16 `yaml:"port" default:"8897"`
|
||||
DstPort uint16 `yaml:"srvport" default:"8897"`
|
||||
}
|
||||
|
||||
type HubConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8898"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"8898"`
|
||||
SrcPort uint16 `yaml:"port" default:"8898"`
|
||||
DstPort uint16 `yaml:"srvport" default:"80"`
|
||||
}
|
||||
|
||||
type FrontConfig struct {
|
||||
SrcPort uint16 `yaml:"src-port" default:"8899"`
|
||||
DstPort uint16 `yaml:"dst-port" default:"80"`
|
||||
SrcPort uint16 `yaml:"port" default:"8899"`
|
||||
DstPort uint16 `yaml:"srvport" default:"80"`
|
||||
}
|
||||
|
||||
type ProxyConfig struct {
|
||||
@@ -48,9 +50,10 @@ type ProxyConfig struct {
|
||||
}
|
||||
|
||||
type DockerConfig struct {
|
||||
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
|
||||
Tag string `yaml:"tag" default:"latest"`
|
||||
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
|
||||
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
|
||||
Tag string `yaml:"tag" default:"latest"`
|
||||
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
|
||||
ImagePullSecrets []string `yaml:"imagepullsecrets"`
|
||||
}
|
||||
|
||||
type ResourcesConfig struct {
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package docker
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
hub = "hub"
|
||||
@@ -9,7 +12,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
registry = "docker.io/kubeshark"
|
||||
registry = "docker.io/kubeshark/"
|
||||
tag = "latest"
|
||||
)
|
||||
|
||||
@@ -18,7 +21,11 @@ func GetRegistry() string {
|
||||
}
|
||||
|
||||
func SetRegistry(value string) {
|
||||
registry = value
|
||||
if strings.HasPrefix(value, "docker.io/kubeshark") {
|
||||
registry = "docker.io/kubeshark/"
|
||||
} else {
|
||||
registry = value
|
||||
}
|
||||
}
|
||||
|
||||
func GetTag() string {
|
||||
@@ -30,7 +37,7 @@ func SetTag(value string) {
|
||||
}
|
||||
|
||||
func getImage(image string) string {
|
||||
return fmt.Sprintf("%s/%s:%s", registry, image, tag)
|
||||
return fmt.Sprintf("%s%s:%s", registry, image, tag)
|
||||
}
|
||||
|
||||
func GetHubImage() string {
|
||||
|
||||
@@ -18,7 +18,7 @@ func FormatError(err error) error {
|
||||
if k8serrors.IsForbidden(err) {
|
||||
errorNew = fmt.Errorf("insufficient permissions: %w. "+
|
||||
"supply the required permission or control %s's access to namespaces by setting %s "+
|
||||
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
|
||||
"in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
|
||||
err,
|
||||
misc.Software,
|
||||
config.SelfNamespaceConfigName,
|
||||
|
||||
2
go.mod
2
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/docker/go-connections v0.4.0
|
||||
github.com/docker/go-units v0.4.0
|
||||
github.com/google/go-github/v37 v37.0.0
|
||||
github.com/kubeshark/base v0.5.0
|
||||
github.com/kubeshark/base v0.5.1
|
||||
github.com/rs/zerolog v1.28.0
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
|
||||
4
go.sum
4
go.sum
@@ -414,8 +414,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0=
|
||||
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
|
||||
github.com/kubeshark/base v0.5.1 h1:msy1iQLgWQK1COoicwWxEDbeXU9J5RuptA5fYeOEzfA=
|
||||
github.com/kubeshark/base v0.5.1/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
|
||||
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
core "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
@@ -64,7 +63,6 @@ func (connector *Connector) isReachable(path string) (bool, error) {
|
||||
}
|
||||
|
||||
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
|
||||
// TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log
|
||||
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
|
||||
|
||||
if podMarshalled, err := json.Marshal(pod); err != nil {
|
||||
@@ -116,22 +114,32 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
|
||||
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
|
||||
type postRegexRequest struct {
|
||||
Regex string `json:"regex"`
|
||||
Namespaces []string `json:"namespaces"`
|
||||
}
|
||||
|
||||
if podsMarshalled, err := json.Marshal(pods); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
|
||||
func (connector *Connector) PostRegexToHub(regex string, namespaces []string) {
|
||||
postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url)
|
||||
|
||||
payload := postRegexRequest{
|
||||
Regex: regex,
|
||||
Namespaces: namespaces,
|
||||
}
|
||||
|
||||
if payloadMarshalled, err := json.Marshal(payload); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to marshal the payload:")
|
||||
} else {
|
||||
ok := false
|
||||
for !ok {
|
||||
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
|
||||
if _, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
|
||||
if _, ok := err.(*url.Error); ok {
|
||||
break
|
||||
}
|
||||
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
|
||||
log.Debug().Err(err).Msg("Failed sending the payload to Hub:")
|
||||
} else {
|
||||
ok = true
|
||||
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
|
||||
log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported payload to Hub:")
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ const (
|
||||
ServiceAccountName = SelfResourcesPrefix + "service-account"
|
||||
WorkerDaemonSetName = SelfResourcesPrefix + "worker-daemon-set"
|
||||
WorkerPodName = SelfResourcesPrefix + "worker"
|
||||
ConfigMapName = SelfResourcesPrefix + "config"
|
||||
MinKubernetesServerVersion = "1.16.0"
|
||||
)
|
||||
|
||||
|
||||
@@ -177,13 +177,11 @@ type PodOptions struct {
|
||||
ServiceAccountName string
|
||||
Resources Resources
|
||||
ImagePullPolicy core.PullPolicy
|
||||
ImagePullSecrets []core.LocalObjectReference
|
||||
Debug bool
|
||||
}
|
||||
|
||||
func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
|
||||
configMapVolume := &core.ConfigMapVolumeSource{}
|
||||
configMapVolume.Name = ConfigMapName
|
||||
|
||||
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
|
||||
@@ -251,6 +249,7 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
ImagePullSecrets: opts.ImagePullSecrets,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -262,9 +261,6 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
|
||||
}
|
||||
|
||||
func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPort string) (*core.Pod, error) {
|
||||
configMapVolume := &core.ConfigMapVolumeSource{}
|
||||
configMapVolume.Name = ConfigMapName
|
||||
|
||||
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
|
||||
@@ -315,7 +311,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
|
||||
Env: []core.EnvVar{
|
||||
{
|
||||
Name: "REACT_APP_DEFAULT_FILTER",
|
||||
Value: "timestamp >= now()",
|
||||
Value: " ",
|
||||
},
|
||||
{
|
||||
Name: "REACT_APP_HUB_HOST",
|
||||
@@ -353,6 +349,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
|
||||
Effect: core.TaintEffectNoSchedule,
|
||||
},
|
||||
},
|
||||
ImagePullSecrets: opts.ImagePullSecrets,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -416,11 +413,6 @@ func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (
|
||||
return provider.doesResourceExist(namespaceResource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
configMapResource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(configMapResource, err)
|
||||
}
|
||||
|
||||
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, name string) (bool, error) {
|
||||
serviceAccountResource, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, name, metav1.GetOptions{})
|
||||
return provider.doesResourceExist(serviceAccountResource, err)
|
||||
@@ -660,26 +652,21 @@ func (provider *Provider) ApplyWorkerDaemonSet(
|
||||
daemonSetName string,
|
||||
podImage string,
|
||||
workerPodName string,
|
||||
nodeNames []string,
|
||||
serviceAccountName string,
|
||||
resources Resources,
|
||||
imagePullPolicy core.PullPolicy,
|
||||
imagePullSecrets []core.LocalObjectReference,
|
||||
serviceMesh bool,
|
||||
tls bool,
|
||||
debug bool,
|
||||
) error {
|
||||
log.Debug().
|
||||
Int("node-count", len(nodeNames)).
|
||||
Str("namespace", namespace).
|
||||
Str("daemonset-name", daemonSetName).
|
||||
Str("image", podImage).
|
||||
Str("pod", workerPodName).
|
||||
Msg("Applying worker DaemonSets.")
|
||||
|
||||
if len(nodeNames) == 0 {
|
||||
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
|
||||
}
|
||||
|
||||
command := []string{"./worker", "-i", "any", "-port", "8897"}
|
||||
|
||||
if debug {
|
||||
@@ -759,22 +746,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
|
||||
workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits)
|
||||
workerContainer.WithResources(workerResources)
|
||||
|
||||
matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0)
|
||||
for _, nodeName := range nodeNames {
|
||||
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
|
||||
nodeSelectorRequirement.WithKey("metadata.name")
|
||||
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
|
||||
nodeSelectorRequirement.WithValues(nodeName)
|
||||
|
||||
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
|
||||
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
|
||||
matchFields = append(matchFields, nodeSelectorTerm)
|
||||
}
|
||||
|
||||
nodeSelector := applyconfcore.NodeSelector()
|
||||
nodeSelector.WithNodeSelectorTerms(matchFields...)
|
||||
nodeAffinity := applyconfcore.NodeAffinity()
|
||||
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
|
||||
affinity := applyconfcore.Affinity()
|
||||
affinity.WithNodeAffinity(nodeAffinity)
|
||||
|
||||
@@ -812,6 +784,14 @@ func (provider *Provider) ApplyWorkerDaemonSet(
|
||||
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
|
||||
podSpec.WithVolumes(procfsVolume, sysfsVolume)
|
||||
|
||||
if len(imagePullSecrets) > 0 {
|
||||
localObjectReference := applyconfcore.LocalObjectReference()
|
||||
for _, secret := range imagePullSecrets {
|
||||
localObjectReference.WithName(secret.Name)
|
||||
}
|
||||
podSpec.WithImagePullSecrets(localObjectReference)
|
||||
}
|
||||
|
||||
podTemplate := applyconfcore.PodTemplateSpec()
|
||||
podTemplate.WithLabels(map[string]string{
|
||||
"app": workerPodName,
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
const k8sProxyApiPrefix = "/"
|
||||
const selfServicePort = 80
|
||||
|
||||
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string, cancel context.CancelFunc) (*http.Server, error) {
|
||||
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string) (*http.Server, error) {
|
||||
log.Info().
|
||||
Str("namespace", selfNamespace).
|
||||
Str("service", selfServiceName).
|
||||
@@ -55,7 +55,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16,
|
||||
go func() {
|
||||
if err := server.Serve(l); err != nil && err != http.ErrServerClosed {
|
||||
log.Error().Err(err).Msg("While creating proxy!")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -99,7 +99,7 @@ func getRerouteHttpHandlerSelfStatic(proxyHandler http.Handler, selfNamespace st
|
||||
})
|
||||
}
|
||||
|
||||
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) {
|
||||
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context) (*portforward.PortForwarder, error) {
|
||||
pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -132,7 +132,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
|
||||
go func() {
|
||||
if err = forwarder.ForwardPorts(); err != nil {
|
||||
log.Error().Err(err).Msg("While Kubernetes port-forwarding!")
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -1,26 +1,24 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
|
||||
"github.com/kubeshark/base/pkg/models"
|
||||
core "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
|
||||
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
|
||||
for _, pod := range targettedPods {
|
||||
func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
|
||||
nodeToTargetedPodsMap := make(models.NodeToPodsMap)
|
||||
for _, pod := range targetedPods {
|
||||
minimizedPod := getMinimizedPod(pod)
|
||||
|
||||
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
|
||||
existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
|
||||
if existingList == nil {
|
||||
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
|
||||
nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
|
||||
} else {
|
||||
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
|
||||
nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
|
||||
}
|
||||
}
|
||||
return nodeToTargettedPodsMap
|
||||
return nodeToTargetedPodsMap
|
||||
}
|
||||
|
||||
func getMinimizedPod(fullPod core.Pod) core.Pod {
|
||||
@@ -48,44 +46,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
|
||||
return result
|
||||
}
|
||||
|
||||
func excludeSelfPods(pods []core.Pod) []core.Pod {
|
||||
selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix)
|
||||
|
||||
nonSelfPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods {
|
||||
if !selfPrefixRegex.MatchString(pod.Name) {
|
||||
nonSelfPods = append(nonSelfPods, pod)
|
||||
}
|
||||
}
|
||||
|
||||
return nonSelfPods
|
||||
}
|
||||
|
||||
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
||||
added = getMissingPods(newPods, oldPods)
|
||||
removed = getMissingPods(oldPods, newPods)
|
||||
|
||||
return added, removed
|
||||
}
|
||||
|
||||
//returns pods present in pods1 array and missing in pods2 array
|
||||
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
missingPods := make([]core.Pod, 0)
|
||||
for _, pod1 := range pods1 {
|
||||
var found = false
|
||||
for _, pod2 := range pods2 {
|
||||
if pod1.UID == pod2.UID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
missingPods = append(missingPods, pod1)
|
||||
}
|
||||
}
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
|
||||
podInfos := make([]*models.PodInfo, 0)
|
||||
for _, pod := range pods {
|
||||
|
||||
@@ -1,389 +0,0 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/base/pkg/models"
|
||||
"github.com/kubeshark/kubeshark/debounce"
|
||||
"github.com/kubeshark/kubeshark/docker"
|
||||
"github.com/kubeshark/kubeshark/misc"
|
||||
"github.com/kubeshark/kubeshark/utils"
|
||||
"github.com/rs/zerolog/log"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
const updateWorkersDelay = 5 * time.Second
|
||||
|
||||
type TargettedPodChangeEvent struct {
|
||||
Added []v1.Pod
|
||||
Removed []v1.Pod
|
||||
}
|
||||
|
||||
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
|
||||
type WorkerSyncer struct {
|
||||
startTime time.Time
|
||||
context context.Context
|
||||
CurrentlyTargettedPods []v1.Pod
|
||||
config WorkerSyncerConfig
|
||||
kubernetesProvider *Provider
|
||||
TapPodChangesOut chan TargettedPodChangeEvent
|
||||
WorkerPodsChanges chan *v1.Pod
|
||||
ErrorOut chan K8sTapManagerError
|
||||
nodeToTargettedPodMap models.NodeToPodsMap
|
||||
targettedNodes []string
|
||||
}
|
||||
|
||||
type WorkerSyncerConfig struct {
|
||||
TargetNamespaces []string
|
||||
PodFilterRegex regexp.Regexp
|
||||
SelfNamespace string
|
||||
WorkerResources Resources
|
||||
ImagePullPolicy v1.PullPolicy
|
||||
SelfServiceAccountExists bool
|
||||
ServiceMesh bool
|
||||
Tls bool
|
||||
Debug bool
|
||||
}
|
||||
|
||||
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
|
||||
syncer := &WorkerSyncer{
|
||||
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
|
||||
context: ctx,
|
||||
CurrentlyTargettedPods: make([]v1.Pod, 0),
|
||||
config: config,
|
||||
kubernetesProvider: kubernetesProvider,
|
||||
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100),
|
||||
WorkerPodsChanges: make(chan *v1.Pod, 100),
|
||||
ErrorOut: make(chan K8sTapManagerError, 100),
|
||||
}
|
||||
|
||||
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := syncer.updateWorkers(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go syncer.watchPodsForTargetting()
|
||||
go syncer.watchWorkerEvents()
|
||||
go syncer.watchWorkerPods()
|
||||
return syncer, nil
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
|
||||
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
|
||||
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex)
|
||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software))
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("pod", pod.Name).
|
||||
Str("node", pod.Spec.NodeName).
|
||||
Interface("phase", pod.Status.Phase).
|
||||
Msg("Watching pod events...")
|
||||
if pod.Spec.NodeName != "" {
|
||||
workerSyncer.WorkerPodsChanges <- pod
|
||||
}
|
||||
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
|
||||
|
||||
case <-workerSyncer.context.Done():
|
||||
log.Debug().
|
||||
Str("pod", WorkerPodName).
|
||||
Msg("Watching pod, context done.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
|
||||
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
|
||||
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod")
|
||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
event, err := wEvent.ToEvent()
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Str("pod", WorkerPodName).
|
||||
Err(err).
|
||||
Msg("Parsing resource event.")
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("pod", WorkerPodName).
|
||||
Str("event", event.Name).
|
||||
Time("time", event.CreationTimestamp.Time).
|
||||
Str("name", event.Regarding.Name).
|
||||
Str("kind", event.Regarding.Kind).
|
||||
Str("reason", event.Reason).
|
||||
Str("note", event.Note).
|
||||
Msg("Watching events.")
|
||||
|
||||
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name)
|
||||
if err1 != nil {
|
||||
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
|
||||
continue
|
||||
}
|
||||
|
||||
workerSyncer.WorkerPodsChanges <- pod
|
||||
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
log.Error().
|
||||
Str("pod", WorkerPodName).
|
||||
Err(err).
|
||||
Msg("While watching events.")
|
||||
|
||||
case <-workerSyncer.context.Done():
|
||||
log.Debug().
|
||||
Str("pod", WorkerPodName).
|
||||
Msg("Watching pod events, context done.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
|
||||
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
|
||||
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
|
||||
|
||||
handleChangeInPods := func() {
|
||||
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
|
||||
if err != nil {
|
||||
workerSyncer.ErrorOut <- K8sTapManagerError{
|
||||
OriginalError: err,
|
||||
TapManagerReason: TapManagerPodListError,
|
||||
}
|
||||
}
|
||||
|
||||
if !changeFound {
|
||||
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
|
||||
return
|
||||
}
|
||||
if err := workerSyncer.updateWorkers(); err != nil {
|
||||
workerSyncer.ErrorOut <- K8sTapManagerError{
|
||||
OriginalError: err,
|
||||
TapManagerReason: TapManagerWorkerUpdateError,
|
||||
}
|
||||
}
|
||||
}
|
||||
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
|
||||
|
||||
for {
|
||||
select {
|
||||
case wEvent, ok := <-eventChan:
|
||||
if !ok {
|
||||
eventChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
pod, err := wEvent.ToPod()
|
||||
if err != nil {
|
||||
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
|
||||
continue
|
||||
}
|
||||
|
||||
switch wEvent.Type {
|
||||
case EventAdded:
|
||||
log.Debug().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Msg("Added matching pod.")
|
||||
if err := restartWorkersDebouncer.SetOn(); err != nil {
|
||||
log.Error().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Err(err).
|
||||
Msg("While restarting workers!")
|
||||
}
|
||||
case EventDeleted:
|
||||
log.Debug().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Msg("Removed matching pod.")
|
||||
if err := restartWorkersDebouncer.SetOn(); err != nil {
|
||||
log.Error().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Err(err).
|
||||
Msg("While restarting workers!")
|
||||
}
|
||||
case EventModified:
|
||||
log.Debug().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Str("ip", pod.Status.PodIP).
|
||||
Interface("phase", pod.Status.Phase).
|
||||
Msg("Modified matching pod.")
|
||||
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
// - Pod deletion
|
||||
// - Pod reaches start state
|
||||
// - Pod reaches ready state
|
||||
// Ready/unready transitions might also trigger this event.
|
||||
if pod.Status.PodIP != "" {
|
||||
if err := restartWorkersDebouncer.SetOn(); err != nil {
|
||||
log.Error().
|
||||
Str("pod", pod.Name).
|
||||
Str("namespace", pod.Namespace).
|
||||
Err(err).
|
||||
Msg("While restarting workers!")
|
||||
}
|
||||
}
|
||||
case EventBookmark:
|
||||
break
|
||||
case EventError:
|
||||
break
|
||||
}
|
||||
case err, ok := <-errorChan:
|
||||
if !ok {
|
||||
errorChan = nil
|
||||
continue
|
||||
}
|
||||
|
||||
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
|
||||
continue
|
||||
|
||||
case <-workerSyncer.context.Done():
|
||||
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
|
||||
restartWorkersDebouncer.Cancel()
|
||||
// TODO: Does this also perform cleanup?
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
|
||||
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
|
||||
restartWorkersDebouncer.Cancel()
|
||||
workerSyncer.ErrorOut <- K8sTapManagerError{
|
||||
OriginalError: err,
|
||||
TapManagerReason: TapManagerPodWatchError,
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
|
||||
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
|
||||
return err, false
|
||||
} else {
|
||||
podsToTarget := excludeSelfPods(matchingPods)
|
||||
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
|
||||
for _, addedPod := range addedPods {
|
||||
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
|
||||
}
|
||||
for _, removedPod := range removedPods {
|
||||
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
|
||||
}
|
||||
if len(addedPods) > 0 || len(removedPods) > 0 {
|
||||
workerSyncer.CurrentlyTargettedPods = podsToTarget
|
||||
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
|
||||
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{
|
||||
Added: addedPods,
|
||||
Removed: removedPods,
|
||||
}
|
||||
return nil, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
|
||||
func (workerSyncer *WorkerSyncer) updateWorkers() error {
|
||||
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
|
||||
i := 0
|
||||
for node := range workerSyncer.nodeToTargettedPodMap {
|
||||
nodesToTarget[i] = node
|
||||
i++
|
||||
}
|
||||
|
||||
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
|
||||
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
|
||||
|
||||
image := docker.GetWorkerImage()
|
||||
|
||||
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
|
||||
var serviceAccountName string
|
||||
if workerSyncer.config.SelfServiceAccountExists {
|
||||
serviceAccountName = ServiceAccountName
|
||||
} else {
|
||||
serviceAccountName = ""
|
||||
}
|
||||
|
||||
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
|
||||
for nodeName := range workerSyncer.nodeToTargettedPodMap {
|
||||
nodeNames = append(nodeNames, nodeName)
|
||||
}
|
||||
|
||||
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
|
||||
workerSyncer.context,
|
||||
workerSyncer.config.SelfNamespace,
|
||||
WorkerDaemonSetName,
|
||||
image,
|
||||
WorkerPodName,
|
||||
nodeNames,
|
||||
serviceAccountName,
|
||||
workerSyncer.config.WorkerResources,
|
||||
workerSyncer.config.ImagePullPolicy,
|
||||
workerSyncer.config.ServiceMesh,
|
||||
workerSyncer.config.Tls,
|
||||
workerSyncer.config.Debug); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
|
||||
} else {
|
||||
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
|
||||
workerSyncer.context,
|
||||
workerSyncer.config.SelfNamespace,
|
||||
WorkerDaemonSetName,
|
||||
image,
|
||||
WorkerPodName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Msg("Successfully resetted Worker DaemonSet")
|
||||
}
|
||||
|
||||
workerSyncer.targettedNodes = nodesToTarget
|
||||
|
||||
return nil
|
||||
}
|
||||
54
kubernetes/workers.go
Normal file
54
kubernetes/workers.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kubeshark/kubeshark/docker"
|
||||
"github.com/rs/zerolog/log"
|
||||
core "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func CreateWorkers(
|
||||
kubernetesProvider *Provider,
|
||||
selfServiceAccountExists bool,
|
||||
ctx context.Context,
|
||||
namespace string,
|
||||
resources Resources,
|
||||
imagePullPolicy core.PullPolicy,
|
||||
imagePullSecrets []core.LocalObjectReference,
|
||||
serviceMesh bool,
|
||||
tls bool,
|
||||
debug bool,
|
||||
) error {
|
||||
image := docker.GetWorkerImage()
|
||||
|
||||
var serviceAccountName string
|
||||
if selfServiceAccountExists {
|
||||
serviceAccountName = ServiceAccountName
|
||||
} else {
|
||||
serviceAccountName = ""
|
||||
}
|
||||
|
||||
log.Info().Msg("Creating the worker DaemonSet...")
|
||||
|
||||
if err := kubernetesProvider.ApplyWorkerDaemonSet(
|
||||
ctx,
|
||||
namespace,
|
||||
WorkerDaemonSetName,
|
||||
image,
|
||||
WorkerPodName,
|
||||
serviceAccountName,
|
||||
resources,
|
||||
imagePullPolicy,
|
||||
imagePullSecrets,
|
||||
serviceMesh,
|
||||
tls,
|
||||
debug,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msg("Successfully created the worker DaemonSet.")
|
||||
|
||||
return nil
|
||||
}
|
||||
15
kubeshark.go
15
kubeshark.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/kubeshark/kubeshark/cmd"
|
||||
@@ -11,6 +12,20 @@ import (
|
||||
|
||||
func main() {
|
||||
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
|
||||
// Short caller (file:line)
|
||||
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
|
||||
short := file
|
||||
for i := len(file) - 1; i > 0; i-- {
|
||||
if file[i] == '/' {
|
||||
short = file[i+1:]
|
||||
break
|
||||
}
|
||||
}
|
||||
file = short
|
||||
return file + ":" + strconv.Itoa(line)
|
||||
}
|
||||
|
||||
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
|
||||
cmd.Execute()
|
||||
}
|
||||
|
||||
@@ -113,11 +113,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveConfigMap(ctx, selfResourcesNamespace, kubernetes.ConfigMapName); err != nil {
|
||||
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, selfResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
}
|
||||
|
||||
if resources, err := kubernetesProvider.ListManagedServiceAccounts(ctx, selfResourcesNamespace); err != nil {
|
||||
resourceDesc := fmt.Sprintf("ServiceAccounts in namespace %s", selfResourcesNamespace)
|
||||
handleDeletionError(err, resourceDesc, &leftoverResources)
|
||||
|
||||
@@ -13,10 +13,10 @@ import (
|
||||
core "k8s.io/api/core/v1"
|
||||
)
|
||||
|
||||
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
|
||||
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, imagePullSecrets []core.LocalObjectReference, debug bool) (bool, error) {
|
||||
if !isNsRestrictedMode {
|
||||
if err := createSelfNamespace(ctx, kubernetesProvider, selfNamespace); err != nil {
|
||||
return false, err
|
||||
log.Debug().Err(err).Send()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
|
||||
ServiceAccountName: serviceAccountName,
|
||||
Resources: hubResources,
|
||||
ImagePullPolicy: imagePullPolicy,
|
||||
ImagePullSecrets: imagePullSecrets,
|
||||
Debug: debug,
|
||||
}
|
||||
|
||||
@@ -49,6 +50,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
|
||||
ServiceAccountName: serviceAccountName,
|
||||
Resources: hubResources,
|
||||
ImagePullPolicy: imagePullPolicy,
|
||||
ImagePullSecrets: imagePullSecrets,
|
||||
Debug: debug,
|
||||
}
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ const (
|
||||
Red = "\033[1;31m%s\033[0m"
|
||||
Green = "\033[1;32m%s\033[0m"
|
||||
Yellow = "\033[1;33m%s\033[0m"
|
||||
Purple = "\033[1;34m%s\033[0m"
|
||||
Blue = "\033[1;34m%s\033[0m"
|
||||
Magenta = "\033[1;35m%s\033[0m"
|
||||
Teal = "\033[1;36m%s\033[0m"
|
||||
Cyan = "\033[1;36m%s\033[0m"
|
||||
White = "\033[1;37m%s\033[0m"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user