Compare commits

..

89 Commits
38.0 ... 39.0

Author SHA1 Message Date
M. Mert Yildiran
bdf81380d8 Remove the dots at the end of command flag descriptions 2023-03-18 00:47:44 +03:00
M. Mert Yildiran
889730e9e5 🔨 Viewer to Analyzer 2023-03-17 20:09:09 +03:00
Alon Girmonsky
3b3927ce3a Preparing for the release 2023-03-17 00:23:16 -07:00
Alon Girmonsky
8e07226060 Preparing for a new release 2023-03-17 00:19:01 -07:00
M. Mert Yildiran
b7f0dd7f3c 🔨 Rename the newly added customLabels field to resourceLabels in the config 2023-03-16 02:34:41 +03:00
M. Mert Yildiran
d9ec538aff Add customLabels field to config.yaml 2023-03-14 23:45:41 +03:00
M. Mert Yildiran
362b17dec4 🔥 Remove a log message 2023-03-13 22:54:10 +03:00
M. Mert Yildiran
4216f07ec7 🐛 os.Exit after returning from the HTTP handler 2023-03-13 22:51:48 +03:00
M. Mert Yildiran
a4d35599df Change some logs 2023-03-13 22:45:57 +03:00
M. Mert Yildiran
bcc1a36d71 👕 Fix the linter error 2023-03-13 22:42:56 +03:00
M. Mert Yildiran
b4a3a0451e Increase DefaultSleep value 2023-03-13 22:42:17 +03:00
M. Mert Yildiran
1a2892d46e Increase DefaultSleep value 2023-03-13 22:35:03 +03:00
M. Mert Yildiran
db8c9ec163 Ask for license key input 2023-03-10 18:41:38 +03:00
M. Mert Yildiran
b11619fbfa Panic if WriteConfig fails in pro command 2023-03-09 19:50:17 +03:00
M. Mert Yildiran
49ce15f9da Update some command descriptions 2023-03-09 19:48:42 +03:00
M. Mert Yildiran
65c5df3211 Add pro command to acquire a Pro license 2023-03-09 19:48:16 +03:00
M. Mert Yildiran
de7f598001 🐛 Fix an issue in the script title retrieval from the first comment once more 2023-03-08 01:04:20 +03:00
Christopher Redwine
28fa6e494f Upgrade deprecated set-output GitHub Actions syntax to avoid warnings (#1314)
Co-authored-by: M. Mert Yildiran <me@mertyildiran.com>
2023-03-08 00:29:32 +03:00
M. Mert Yildiran
0c68c0f99f 🐛 Fix an issue in the script title retrieval from the first comment 2023-03-08 00:14:35 +03:00
M. Mert Yildiran
cedb7bc8bc Watch scripts inside tap command 2023-03-07 20:21:28 +03:00
M. Mert Yildiran
ca35177b44 Change the log printing strategy in console command 2023-03-05 21:08:44 +03:00
M. Mert Yildiran
08b96c2663 👷 Fix brew tap reports 0.0 as version number 2023-03-05 21:00:35 +03:00
M. Mert Yildiran
606fddc776 🐛 Fix the log messages related to POST /scripts/env request 2023-03-04 14:12:03 +03:00
M. Mert Yildiran
9a95fa364c Change consts config field to env 2023-03-03 17:32:19 +03:00
M. Mert Yildiran
72a18871b1 Update Hub about the changes in the config in case of kubeshark tap re-run 2023-02-16 20:06:39 +03:00
M. Mert Yildiran
0a00dfbaf5 Add grace period 2023-02-16 00:58:01 +03:00
M. Mert Yildiran
b9a7cfb4fa Reduce DefaultSleep value 2023-02-16 00:49:45 +03:00
M. Mert Yildiran
f0b68cb44b Revert "Create Worker(s) before Hub" 2023-02-15 22:33:59 +03:00
M. Mert Yildiran
dcd42798c7 Establish a proxy in console command if it's not present 2023-02-15 00:56:33 +03:00
M. Mert Yildiran
384f62f94f Create Worker(s) before Hub and reduce the grace periods 2023-02-15 00:43:30 +03:00
M. Mert Yildiran
7312addb65 Change the key into index 2023-02-14 23:10:38 +03:00
M. Mert Yildiran
41ba509428 Add scripts command 2023-02-14 20:23:25 +03:00
M. Mert Yildiran
85da7f71ac Color the error messages in console command 2023-02-09 15:19:05 +03:00
M. Mert Yildiran
6ca727373c Fix the command description style 2023-02-09 15:11:08 +03:00
M. Mert Yildiran
f4c6613e6e Add console command 2023-02-09 15:09:52 +03:00
M. Mert Yildiran
37f615c9bb Add a grace period of a second before calling Hub endpoints 2023-02-07 12:44:32 +03:00
M. Mert Yildiran
ac9298d681 Log the POST to Hub calls even if debug is false 2023-02-07 12:43:31 +03:00
M. Mert Yildiran
2a20dc173c Call POST /scripts/done after the posting the scripts to mark the end 2023-02-07 12:25:41 +03:00
M. Mert Yildiran
a6dd98d241 Log the found scripts 2023-02-06 15:27:07 +03:00
M. Mert Yildiran
5d15667a03 POST storage limit to Hub and better error handling 2023-02-06 07:58:43 +03:00
M. Mert Yildiran
43e9e90f7d ⬆️ Upgrade to github.com/kubeshark/base@v0.6.3 2023-02-05 23:48:59 +03:00
M. Mert Yildiran
776f9ce9af Change consts type to map[string]interface{} 2023-02-05 22:55:02 +03:00
M. Mert Yildiran
e62ebea8d8 Try to load the config YAML from CWD first 2023-02-05 22:36:40 +03:00
M. Mert Yildiran
eee641082e Add ScriptingConfig and read JavaScript files from the source directory using github.com/robertkrimen/otto 2023-02-05 22:00:01 +03:00
M. Mert Yildiran
a1096cf2b9 Comment out GODEBUG=netdns=go environment variable 2023-02-05 09:02:13 +03:00
M. Mert Yildiran
1c31f5df01 🐛 Fix a data race in pod watching 2023-02-05 08:41:53 +03:00
M. Mert Yildiran
6e0f2aa10a 🔧 Add build-race Makefile rule 2023-02-05 08:39:09 +03:00
M. Mert Yildiran
22e8490b93 Add GODEBUG=netdns=go environment variable to worker 2023-02-04 03:27:12 +03:00
M. Mert Yildiran
17e038e703 Do POST /license after Hub is started 2023-01-31 22:31:44 +03:00
M. Mert Yildiran
4cb4ba568b Change a debug log message 2023-01-31 00:02:41 +03:00
M. Mert Yildiran
79cc2e70b6 Revert " Add community, edition and upgrade commands"
This reverts commit 4b2c678fa3.
2023-01-31 00:01:55 +03:00
M. Mert Yildiran
fe2423e9d9 Revert " Pull the images from public ECR repositories in case of the edition is not community"
This reverts commit 964d30bf80.
2023-01-31 00:01:43 +03:00
M. Mert Yildiran
964d30bf80 Pull the images from public ECR repositories in case of the edition is not community 2023-01-30 05:11:42 +03:00
M. Mert Yildiran
4b2c678fa3 Add community, edition and upgrade commands 2023-01-29 23:52:49 +03:00
M. Mert Yildiran
a9fdde4110 🐛 Fix the missing / in image URL 2023-01-29 03:30:23 +03:00
M. Mert Yildiran
4eeab41e6d Don't fail if self namespace does not exist 2023-01-29 01:57:05 +03:00
M. Mert Yildiran
dce7de6cc9 Treat Docker registry as an absolute prefix 2023-01-29 01:56:18 +03:00
M. Mert Yildiran
c809117b2c 🐛 Don't open the Hub URL in the browser (proxy command) 2023-01-26 19:35:21 +03:00
M. Mert Yildiran
2c6c71cf49 Log the worker DaemonSet creation 2023-01-22 05:14:28 +03:00
M. Mert Yildiran
1533d1ec28 Suggest kubeshark proxy at the end of tap 2023-01-22 05:10:29 +03:00
M. Mert Yildiran
edf5c8cd6d Don't call cancel() in case of a proxy/port-forward error 2023-01-22 05:04:44 +03:00
M. Mert Yildiran
6c69fb6bc4 POST pod regex to Hub if Kubeshark is still running 2023-01-22 04:33:30 +03:00
M. Mert Yildiran
4dd941f8d9 Set REACT_APP_DEFAULT_FILTER to empty string 2023-01-22 04:27:05 +03:00
M. Mert Yildiran
90294e32c1 👕 Remove unused getK8sTapManagerErrorText method 2023-01-22 04:26:21 +03:00
M. Mert Yildiran
5f4a856c5e 🐛 Fix the proxy command 2023-01-22 04:25:47 +03:00
M. Mert Yildiran
4e3233ade8 Deploy workers DaemonSet without NodeSelector and call POST /pods/regex to set the pod regex in Hub 2023-01-22 04:14:44 +03:00
M. Mert Yildiran
846f253a03 Don't watch and POST Worker pods to Hub 2023-01-21 03:09:15 +03:00
M. Mert Yildiran
f128ae3993 🔥 Remove config map and image pull checks 2023-01-21 01:32:21 +03:00
Alon Girmonsky
38da25ecc8 Infra information
Lately we've been experiencing bugs resulting from an environment with multiple nodes and the absence of a CNI.
2023-01-18 10:04:47 -08:00
M. Mert Yildiran
bf777f9fca Change the color coding of new, targeted and untargeted pods log messages 2023-01-17 02:09:26 +03:00
M. Mert Yildiran
3c4272c6d1 🐛 Fix the issues in port-forward 2023-01-16 00:06:52 +03:00
Alon Girmonsky
920535b643 protocol-aware 2023-01-15 10:59:16 -08:00
Alon Girmonsky
b55dd5b072 Shorter message bar 2023-01-15 10:47:13 -08:00
Alon Girmonsky
f5cf15d657 Changing the hero message 2023-01-15 10:46:06 -08:00
Alon Girmonsky
fe623438c4 Update the upper message
include DNS and identity-aware service map.
2023-01-14 21:25:00 -08:00
M. Mert Yildiran
1c35a9f143 🐛 Fix the parsing of --proxy-front-port and --proxy-hub-port options 2023-01-15 02:31:47 +03:00
M. Mert Yildiran
d6766a8ac5 🐛 Fix .spec.template.spec.imagePullSecrets: element 0: associative list with keys has an element that omits key field \"name\" error 2023-01-08 21:47:00 +03:00
M. Mert Yildiran
eaa21f0789 🔨 Fix the targetted typo 2023-01-08 21:44:12 +03:00
M. Mert Yildiran
ac6aee07f5 Update README.md 2023-01-07 14:36:26 +03:00
M. Mert Yildiran
5b428c5636 Update README.md 2023-01-07 14:35:12 +03:00
M. Mert Yildiran
894f97ca41 Add an option to set the ImagePullSecrets 2023-01-07 14:20:01 +03:00
Atıl Sensalduz
45f8c8a834 docs: add how to install step for brew (#1286) 2023-01-03 15:14:38 -08:00
Alon Girmonsky
75ff80218b Update README.md 2022-12-31 14:14:19 -08:00
Alon Girmonsky
6255e1d4ef Update README.md 2022-12-31 14:10:32 -08:00
Alon Girmonsky
8ea606fb59 Update README.md 2022-12-31 10:15:05 -08:00
Alon Girmonsky
c9d41c53ca Update README.md 2022-12-30 21:45:13 -08:00
Alon Girmonsky
3995dae16d Update README.md 2022-12-30 21:41:56 -08:00
Alon Girmonsky
447fcf1b31 Change the center message
From Mizu to Distributed PCAP storage
2022-12-30 21:39:29 -08:00
M. Mert Yildiran
7e7fa772e1 🐛 Have a short caller (file:line) log 2022-12-30 08:30:48 +03:00
41 changed files with 1339 additions and 973 deletions

View File

@@ -10,6 +10,9 @@ assignees: ''
**Describe the bug**
A clear and concise description of what the bug is.
**Provide more information**
Running on EKS, AKS, GKE, Minikube, Rancher, OpenShift? Number of Nodes? CNI?
**To Reproduce**
Steps to reproduce the behavior:
1. Run `kubeshark <command> ...`

View File

@@ -26,9 +26,11 @@ jobs:
id: version
shell: bash
run: |
echo "##[set-output name=tag;]$(echo ${GITHUB_REF#refs/*/})"
echo "##[set-output name=build_timestamp;]$(echo $(date +%s))"
echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})"
{
echo "tag=${GITHUB_REF#refs/*/}"
echo "build_timestamp=$(date +%s)"
echo "branch=${GITHUB_REF#refs/heads/}"
} >> "$GITHUB_OUTPUT"
- name: Build
run: make build-all VER='${{ steps.version.outputs.tag }}' BUILD_TIMESTAMP='${{ steps.version.outputs.build_timestamp }}'
@@ -57,6 +59,16 @@ jobs:
with:
fetch-depth: 0
- name: Version
id: version
shell: bash
run: |
{
echo "tag=${GITHUB_REF#refs/*/}"
echo "build_timestamp=$(date +%s)"
echo "branch=${GITHUB_REF#refs/heads/}"
} >> "$GITHUB_OUTPUT"
- name: Fetch all tags
run: git fetch --force --tags
@@ -73,3 +85,5 @@ jobs:
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.HOMEBREW_TOKEN }}
VER: ${{ steps.version.outputs.tag }}
BUILD_TIMESTAMP: ${{ steps.version.outputs.build_timestamp }}

7
.gitignore vendored
View File

@@ -56,4 +56,11 @@ cypress.env.json
# Object files
*.o
# Binaries
bin
# Scripts
scripts/
# CWD config YAML
kubeshark.yaml

View File

@@ -1,4 +1,4 @@
![Kubeshark: The API Traffic Viewer for Kubernetes](https://raw.githubusercontent.com/kubeshark/assets/master/svg/kubeshark-logo.svg)
![Kubeshark: The API Traffic Analyzer for Kubernetes](https://raw.githubusercontent.com/kubeshark/assets/master/svg/kubeshark-logo.svg)
# Contributing to Kubeshark

View File

@@ -15,15 +15,23 @@ help: ## Print this help message.
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
build-debug: ## Build for debuging.
export CGO_ENABLED=1
export GCLFAGS='-gcflags="all=-N -l"'
${MAKE} build-base
build: ## Build.
export CGO_ENABLED=0
export LDFLAGS_EXT='-extldflags=-static -s -w'
${MAKE} build-base
build-race: ## Build with -race flag.
export CGO_ENABLED=1
export GCLFAGS='-race'
export LDFLAGS_EXT='-extldflags=-static -s -w'
${MAKE} build-base
build-base: ## Build binary (select the platform via GOOS / GOARCH env variables).
CGO_ENABLED=0 go build ${GCLFAGS} -ldflags="${LDFLAGS_EXT} \
go build ${GCLFAGS} -ldflags="${LDFLAGS_EXT} \
-X 'github.com/kubeshark/kubeshark/misc.GitCommitHash=$(COMMIT_HASH)' \
-X 'github.com/kubeshark/kubeshark/misc.Branch=$(GIT_BRANCH)' \
-X 'github.com/kubeshark/kubeshark/misc.BuildTimestamp=$(BUILD_TIMESTAMP)' \
@@ -33,6 +41,7 @@ build-base: ## Build binary (select the platform via GOOS / GOARCH env variables
cd bin && shasum -a 256 kubeshark_${SUFFIX} > kubeshark_${SUFFIX}.sha256
build-all: ## Build for all supported platforms.
export CGO_ENABLED=0
echo "Compiling for every OS and Platform" && \
mkdir -p bin && sed s/_VER_/$(VER)/g RELEASE.md.TEMPLATE > bin/README.md && \
$(MAKE) build GOOS=linux GOARCH=amd64 && \

View File

@@ -1,5 +1,5 @@
<p align="center">
<img src="https://raw.githubusercontent.com/kubeshark/assets/master/svg/kubeshark-logo.svg" alt="Kubeshark: Traffic viewer for Kubernetes." height="128px"/>
<img src="https://raw.githubusercontent.com/kubeshark/assets/master/svg/kubeshark-logo.svg" alt="Kubeshark: Traffic analyzer for Kubernetes." height="128px"/>
</p>
<p align="center">
@@ -23,87 +23,61 @@
</a>
</p>
<p>
<p align="center">
Mizu (by UP9) is now Kubeshark, read more about it <a href="https://www.kubeshark.co/mizu-is-now-kubeshark">here</a>.
<b>
<a href="https://github.com/kubeshark/kubeshark/releases/latest">V38.2</a> is out with PCAP export, <a href="https://docs.kubeshark.co/en/dns">DNS</a>, <a href="https://docs.kubeshark.co/en/service_map">Identity-aware Service Map</a> and so much more. Read about it <a href="https://kubeshark.co/pcap-or-it-didnt-happen">here</a>.
</b>
</p>
Kubeshark, the API Traffic Viewer for kubernetes, provides deep visibility and monitoring of all API traffic and payloads going in, out and across containers and pods inside a Kubernetes cluster.
Think of a combination of Chrome Dev Tools, TCPDump and Wireshark, re-invented for Kubernetes.
**Kubeshark** is an API Traffic Analyzer for [**Kubernetes**](https://kubernetes.io/) providing real-time, protocol-level visibility into Kubernetes internal network, capturing and monitoring all traffic and payloads going in, out and across containers, pods, nodes and clusters.
![Simple UI](https://github.com/kubeshark/assets/raw/master/png/kubeshark-ui.png)
## Download
Think [TCPDump](https://en.wikipedia.org/wiki/Tcpdump) and [Wireshark](https://www.wireshark.org/) re-invented for Kubernetes
Kubeshark uses a ~45MB pre-compiled executable binary to communicate with the Kubernetes API. We recommend downloading the `kubeshark` CLI by using one of these options:
## Getting Started
- Choose the right binary, download and use directly from [the latest stable release](https://github.com/kubeshark/kubeshark/releases/latest).
- Use the shell script below :point_down: to automatically download the right binary for your operating system and CPU architecture:
```shell
sh <(curl -Ls https://kubeshark.co/install)
```
- Compile it from source using `make` command then use `./bin/kubeshark__` executable.
## Run
Use the `kubeshark` CLI to capture and view streaming API traffic in real time.
Download **Kubeshark**'s binary distribution [latest release](https://github.com/kubeshark/kubeshark/releases/latest) and run following one of these examples:
```shell
kubeshark tap
```
### Troubleshooting Installation
If something doesn't work or simply to play it safe prior to installing;
> Make sure you have access to https://hub.docker.com/
> Make sure `kubeshark` executable in your `PATH`.
### Select Pods
#### Monitoring a Specific Pod:
```shell
kubeshark tap catalogue-b87b45784-sxc8q
```
#### Monitoring a Set of Pods Using Regex:
```shell
kubeshark tap "(catalo*|front-end*)"
```
### Specify the Namespace
By default, Kubeshark targets the `default` namespace.
To specify a different namespace:
```
kubeshark tap -n sock-shop
```
### Specify All Namespaces
The default strategy of Kubeshark waits for the new pods
to be created. To simply tap all existing namespaces run:
```
kubeshark tap -A
```
```shell
kubeshark tap -n sock-shop "(catalo*|front-end*)"
```
Running any of the :point_up: above commands will open the [Web UI](https://docs.kubeshark.co/en/ui) in your browser which streams the traffic in your Kubernetes cluster in real-time.
### Homebrew
[Homebrew](https://brew.sh/) :beer: users can add Kubeshark formulae with:
```shell
brew tap kubeshark/kubeshark
```
and install Kubeshark CLI with:
```shell
brew install kubeshark
```
## Building From Source
Clone this repository and run `make` command to build it. After the build is complete, the executable can be found at `./bin/kubeshark__`.
## Documentation
Visit our documentation website: [docs.kubeshark.co](https://docs.kubeshark.co)
The documentation resources are open-source and can be found on GitHub: [kubeshark/docs](https://github.com/kubeshark/docs)
To learn more, read the [documentation](https://docs.kubeshark.co).
## Contributing
We ❤️ pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
We :heart: pull requests! See [CONTRIBUTING.md](CONTRIBUTING.md) for the contribution guide.
## Code of Conduct

View File

@@ -1,123 +0,0 @@
package check
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func ImagePullInCluster(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
log.Info().Str("procedure", "image-pull-in-cluster").Msg("Checking:")
namespace := "default"
podName := fmt.Sprintf("%s-test", misc.Program)
defer func() {
if err := kubernetesProvider.RemovePod(ctx, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While removing test pod!")
}
}()
if err := createImagePullInClusterPod(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("While creating test pod!")
return false
}
if err := checkImagePulled(ctx, kubernetesProvider, namespace, podName); err != nil {
log.Printf("%v cluster is not able to pull %s containers from docker hub, err: %v", misc.Program, fmt.Sprintf(utils.Red, "✗"), err)
log.Error().
Str("namespace", namespace).
Str("pod", podName).
Err(err).
Msg("Unable to pull images from Docker Hub!")
return false
}
log.Info().
Str("namespace", namespace).
Str("pod", podName).
Msg("Pulling images from Docker Hub is passed.")
return true
}
func checkImagePulled(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", podName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
eventChan, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{namespace}, podWatchHelper)
timeAfter := time.After(30 * time.Second)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
return err
}
if pod.Status.Phase == core.PodRunning {
return nil
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
return err
case <-timeAfter:
return fmt.Errorf("image not pulled in time")
}
}
}
func createImagePullInClusterPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespace string, podName string) error {
image := docker.GetWorkerImage()
log.Info().Str("image", image).Msg("Testing image pull:")
var zero int64
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: core.PodSpec{
Containers: []core.Container{
{
Name: "probe",
Image: image,
ImagePullPolicy: "Always",
Command: []string{"cat"},
Stdin: true,
},
},
TerminationGracePeriodSeconds: &zero,
},
}
if _, err := kubernetesProvider.CreatePod(ctx, namespace, pod); err != nil {
return err
}
return nil
}

View File

@@ -15,9 +15,6 @@ func KubernetesResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
exist, err := kubernetesProvider.DoesNamespaceExist(ctx, config.Config.SelfNamespace)
allResourcesExist := checkResourceExist(config.Config.SelfNamespace, "namespace", exist, err)
exist, err = kubernetesProvider.DoesConfigMapExist(ctx, config.Config.SelfNamespace, kubernetes.ConfigMapName)
allResourcesExist = checkResourceExist(kubernetes.ConfigMapName, "config map", exist, err) && allResourcesExist
exist, err = kubernetesProvider.DoesServiceAccountExist(ctx, config.Config.SelfNamespace, kubernetes.ServiceAccountName)
allResourcesExist = checkResourceExist(kubernetes.ServiceAccountName, "service account", exist, err) && allResourcesExist

View File

@@ -33,9 +33,6 @@ func runCheck() {
checkPassed = check.KubernetesPermissions(ctx, embedFS, kubernetesProvider)
}
if checkPassed {
checkPassed = check.ImagePullInCluster(ctx, kubernetesProvider)
}
if checkPassed {
checkPassed = check.KubernetesResources(ctx, kubernetesProvider)
}

View File

@@ -10,5 +10,5 @@ func performCleanCommand() {
return
}
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, false)
}

View File

@@ -18,32 +18,32 @@ import (
"github.com/rs/zerolog/log"
)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, cancel context.CancelFunc, serviceName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName, cancel)
func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx context.Context, serviceName string, podName string, proxyPortLabel string, srcPort uint16, dstPort uint16, healthCheck string) {
httpServer, err := kubernetes.StartProxy(kubernetesProvider, config.Config.Tap.Proxy.Host, srcPort, config.Config.SelfNamespace, serviceName)
if err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running k8s proxy. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running K8s proxy. Try setting different port using --%s", proxyPortLabel))
return
}
connector := connect.NewConnector(kubernetes.GetLocalhostOnPort(srcPort), connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(healthCheck); err != nil {
log.Error().Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward..")
log.Warn().
Str("service", serviceName).
Msg("Couldn't connect using proxy, stopping proxy and trying to create port-forward...")
if err := httpServer.Shutdown(ctx); err != nil {
log.Error().
Err(errormessage.FormatError(err)).
Msg("Error occurred while stopping proxy.")
}
podRegex, _ := regexp.Compile(kubernetes.HubPodName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx, cancel); err != nil {
podRegex, _ := regexp.Compile(podName)
if _, err := kubernetes.NewPortForward(kubernetesProvider, config.Config.SelfNamespace, podRegex, srcPort, dstPort, ctx); err != nil {
log.Error().
Str("pod-regex", podRegex.String()).
Err(errormessage.FormatError(err)).
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port by using --%s", proxyPortLabel))
cancel()
Msg(fmt.Sprintf("Error occured while running port forward. Try setting different port using --%s", proxyPortLabel))
return
}
@@ -53,7 +53,6 @@ func startProxyReportErrorIfAny(kubernetesProvider *kubernetes.Provider, ctx con
Str("service", serviceName).
Err(errormessage.FormatError(err)).
Msg("Couldn't connect to service.")
cancel()
return
}
}
@@ -97,11 +96,13 @@ func handleKubernetesProviderError(err error) {
}
}
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string) {
func finishSelfExecution(kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, withoutCleanup bool) {
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
defer cancel()
dumpLogsIfNeeded(removalCtx, kubernetesProvider)
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
if !withoutCleanup {
resources.CleanUpSelfResources(removalCtx, cancel, kubernetesProvider, isNsRestrictedMode, selfNamespace)
}
}
func dumpLogsIfNeeded(ctx context.Context, kubernetesProvider *kubernetes.Provider) {

View File

@@ -36,7 +36,7 @@ var configCmd = &cobra.Command{
return nil
}
log.Debug().Str("template", template).Msg("Writing template config...")
log.Debug().Str("template", template).Msg("Printing template config...")
fmt.Printf("%v", template)
}

112
cmd/console.go Normal file
View File

@@ -0,0 +1,112 @@
package cmd
import (
"fmt"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"time"
"github.com/creasty/defaults"
"github.com/gorilla/websocket"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
var consoleCmd = &cobra.Command{
Use: "console",
Short: "Stream the scripting console logs into shell",
RunE: func(cmd *cobra.Command, args []string) error {
runConsole()
return nil
},
}
func init() {
rootCmd.AddCommand(consoleCmd)
defaultTapConfig := configStructs.TapConfig{}
if err := defaults.Set(&defaultTapConfig); err != nil {
log.Debug().Err(err).Send()
}
consoleCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub")
consoleCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the Hub")
}
func runConsole() {
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl))
if err != nil || response.StatusCode != 200 {
log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy..."))
runProxy(false)
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
log.Info().Str("host", config.Config.Tap.Proxy.Host).Uint16("port", config.Config.Tap.Proxy.Hub.SrcPort).Msg("Connecting to:")
u := url.URL{
Scheme: "ws",
Host: fmt.Sprintf("%s:%d", config.Config.Tap.Proxy.Host, config.Config.Tap.Proxy.Hub.SrcPort),
Path: "/scripts/logs",
}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Error().Err(err).Send()
return
}
defer c.Close()
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Error().Err(err).Send()
return
}
msg := string(message)
if strings.Contains(msg, ":ERROR]") {
msg = fmt.Sprintf(utils.Red, msg)
fmt.Fprintln(os.Stderr, msg)
} else {
fmt.Fprintln(os.Stdout, msg)
}
}
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-interrupt:
log.Warn().Msg(fmt.Sprintf(utils.Yellow, "Received interrupt, exiting..."))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Error().Err(err).Send()
return
}
select {
case <-done:
case <-time.After(time.Second):
}
return
}
}
}

128
cmd/pro.go Normal file
View File

@@ -0,0 +1,128 @@
package cmd
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/creasty/defaults"
"github.com/gin-gonic/gin"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/internal/connect"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
var proCmd = &cobra.Command{
Use: "pro",
Short: "Acquire a Pro license",
RunE: func(cmd *cobra.Command, args []string) error {
acquireLicense()
return nil
},
}
const (
PRO_URL = "https://console.kubeshark.co"
PRO_PORT = 5252
)
func init() {
rootCmd.AddCommand(proCmd)
defaultTapConfig := configStructs.TapConfig{}
if err := defaults.Set(&defaultTapConfig); err != nil {
log.Debug().Err(err).Send()
}
proCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub")
proCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the Hub")
}
func acquireLicense() {
if config.Config.Scripting.Source == "" {
log.Error().Msg("`scripting.source` field is empty.")
return
}
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl))
if err != nil || response.StatusCode != 200 {
log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy..."))
runProxy(false)
}
connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort), connect.DefaultRetries, connect.DefaultTimeout)
log.Info().Str("url", PRO_URL).Msg("Opening in the browser:")
utils.OpenBrowser(PRO_URL)
runLicenseRecieverServer()
}
func updateLicense(licenseKey string) {
config.Config.License = licenseKey
err := config.WriteConfig(&config.Config)
if err != nil {
panic(err)
}
go func() {
connector.PostLicense(config.Config.License)
log.Info().Msg("Updated the license. Exiting.")
time.Sleep(2 * time.Second)
os.Exit(0)
}()
}
func runLicenseRecieverServer() {
gin.SetMode(gin.ReleaseMode)
ginApp := gin.New()
ginApp.Use(func(c *gin.Context) {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
c.Writer.Header().Set("Access-Control-Expose-Headers", "Content-Disposition")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(http.StatusNoContent)
return
}
c.Next()
})
ginApp.POST("/", func(c *gin.Context) {
data, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
panic(err)
}
licenseKey := string(data)
log.Info().Str("key", licenseKey).Msg("Received license:")
updateLicense(licenseKey)
})
go func() {
if err := ginApp.Run(fmt.Sprintf(":%d", PRO_PORT)); err != nil {
panic(err)
}
}()
log.Info().Msg("Alternatively enter your license key:")
var licenseKey string
fmt.Scanf("%s", &licenseKey)
updateLicense(licenseKey)
}

View File

@@ -9,9 +9,9 @@ import (
var proxyCmd = &cobra.Command{
Use: "proxy",
Short: "Open the web UI (front-end) in the browser via proxy/port-forward.",
Short: "Open the web UI (front-end) in the browser via proxy/port-forward",
RunE: func(cmd *cobra.Command, args []string) error {
runProxy()
runProxy(true)
return nil
},
}
@@ -24,7 +24,7 @@ func init() {
log.Debug().Err(err).Send()
}
proxyCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
proxyCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
proxyCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")
proxyCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward")
proxyCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward")
proxyCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward")
}

View File

@@ -14,7 +14,7 @@ import (
"github.com/rs/zerolog/log"
)
func runProxy() {
func runProxy(block bool) {
kubernetesProvider, err := getKubernetesProviderForCli()
if err != nil {
return
@@ -26,7 +26,7 @@ func runProxy() {
exists, err := kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.FrontServiceName)
if err != nil {
log.Error().
Str("service", misc.Program).
Str("service", kubernetes.FrontServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
@@ -42,36 +42,96 @@ func runProxy() {
return
}
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
exists, err = kubernetesProvider.DoesServiceExist(ctx, config.Config.SelfNamespace, kubernetes.HubServiceName)
if err != nil {
log.Error().
Str("service", kubernetes.HubServiceName).
Err(err).
Msg("Failed to found service!")
cancel()
return
}
response, err := http.Get(fmt.Sprintf("%s/", url))
if !exists {
log.Error().
Str("service", kubernetes.HubServiceName).
Str("command", fmt.Sprintf("%s %s", misc.Program, tapCmd.Use)).
Msg("Service not found! You should run the command first:")
cancel()
return
}
var establishedProxy bool
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.HubServiceName).
Int("port", int(config.Config.Tap.Proxy.Hub.SrcPort)).
Msg("Found a running service.")
okToOpen("Hub", hubUrl, true)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
connector := connect.NewConnector(hubUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection("/echo"); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Hub."))
return
}
establishedProxy = true
okToOpen("Hub", hubUrl, true)
}
frontUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
response, err = http.Get(fmt.Sprintf("%s/", frontUrl))
if err == nil && response.StatusCode == 200 {
log.Info().
Str("service", kubernetes.FrontServiceName).
Int("port", int(config.Config.Tap.Proxy.Front.SrcPort)).
Msg("Found a running service.")
okToOpen(url)
return
}
log.Info().Msg("Establishing connection to K8s cluster...")
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
okToOpen("Kubeshark", frontUrl, false)
} else {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
connector := connect.NewConnector(frontUrl, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
}
connector := connect.NewConnector(url, connect.DefaultRetries, connect.DefaultTimeout)
if err := connector.TestConnection(""); err != nil {
log.Error().Msg(fmt.Sprintf(utils.Red, "Couldn't connect to Front."))
return
establishedProxy = true
okToOpen("Kubeshark", frontUrl, false)
}
okToOpen(url)
utils.WaitForTermination(ctx, cancel)
if establishedProxy && block {
utils.WaitForTermination(ctx, cancel)
}
}
func okToOpen(url string) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))
func okToOpen(name string, url string, noBrowser bool) {
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", name)))
if !config.Config.HeadlessMode {
if !config.Config.HeadlessMode && !noBrowser {
utils.OpenBrowser(url)
}
}

View File

@@ -12,8 +12,8 @@ import (
var rootCmd = &cobra.Command{
Use: "kubeshark",
Short: fmt.Sprintf("%s: The API Traffic Viewer for Kubernetes", misc.Software),
Long: fmt.Sprintf(`%s: The API Traffic Viewer for Kubernetes
Short: fmt.Sprintf("%s: The API Traffic Analyzer for Kubernetes", misc.Software),
Long: fmt.Sprintf(`%s: The API Traffic Analyzer for Kubernetes
An extensible Kubernetes-aware network sniffer and kernel tracer.
For more info: %s`, misc.Software, misc.Website),
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
@@ -33,7 +33,7 @@ func init() {
rootCmd.PersistentFlags().StringSlice(config.SetCommandName, []string{}, fmt.Sprintf("Override values using --%s", config.SetCommandName))
rootCmd.PersistentFlags().String(config.ConfigFilePathCommandName, defaultConfig.ConfigFilePath, fmt.Sprintf("Override config file path using --%s", config.ConfigFilePathCommandName))
rootCmd.PersistentFlags().BoolP(config.DebugFlag, "d", false, "Enable debug mode.")
rootCmd.PersistentFlags().BoolP(config.DebugFlag, "d", false, "Enable debug mode")
}
// Execute adds all child commands to the root command and sets flags appropriately.

152
cmd/scripts.go Normal file
View File

@@ -0,0 +1,152 @@
package cmd
import (
"context"
"fmt"
"net/http"
"github.com/creasty/defaults"
"github.com/fsnotify/fsnotify"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/internal/connect"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
var scriptsCmd = &cobra.Command{
Use: "scripts",
Short: "Watch the `scripting.source` directory for changes and update the scripts",
RunE: func(cmd *cobra.Command, args []string) error {
runScripts()
return nil
},
}
func init() {
rootCmd.AddCommand(scriptsCmd)
defaultTapConfig := configStructs.TapConfig{}
if err := defaults.Set(&defaultTapConfig); err != nil {
log.Debug().Err(err).Send()
}
scriptsCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub")
scriptsCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the Hub")
}
func runScripts() {
if config.Config.Scripting.Source == "" {
log.Error().Msg("`scripting.source` field is empty.")
return
}
hubUrl := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl))
if err != nil || response.StatusCode != 200 {
log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy..."))
runProxy(false)
}
connector = connect.NewConnector(kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort), connect.DefaultRetries, connect.DefaultTimeout)
watchScripts(true)
}
func watchScripts(block bool) {
files := make(map[string]int64)
scripts, err := config.Config.Scripting.GetScripts()
if err != nil {
log.Error().Err(err).Send()
return
}
for _, script := range scripts {
index, err := connector.PostScript(script)
if err != nil {
log.Error().Err(err).Send()
return
}
files[script.Path] = index
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Error().Err(err).Send()
return
}
if block {
defer watcher.Close()
}
go func() {
for {
select {
// watch for events
case event := <-watcher.Events:
switch event.Op {
case fsnotify.Create:
script, err := misc.ReadScriptFile(event.Name)
if err != nil {
log.Error().Err(err).Send()
return
}
index, err := connector.PostScript(script)
if err != nil {
log.Error().Err(err).Send()
return
}
files[script.Path] = index
case fsnotify.Write:
index := files[event.Name]
script, err := misc.ReadScriptFile(event.Name)
if err != nil {
log.Error().Err(err).Send()
return
}
err = connector.PutScript(script, index)
if err != nil {
log.Error().Err(err).Send()
return
}
case fsnotify.Rename:
index := files[event.Name]
err := connector.DeleteScript(index)
if err != nil {
log.Error().Err(err).Send()
return
}
default:
// pass
}
// watch for errors
case err := <-watcher.Errors:
log.Error().Err(err).Send()
}
}
}()
if err := watcher.Add(config.Config.Scripting.Source); err != nil {
log.Error().Err(err).Send()
}
log.Info().Str("directory", config.Config.Scripting.Source).Msg("Watching scripts against changes:")
if block {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
utils.WaitForTermination(ctx, cancel)
}
}

View File

@@ -15,8 +15,7 @@ import (
var tapCmd = &cobra.Command{
Use: "tap [POD REGEX]",
Short: "Capture the network traffic in your Kubernetes cluster.",
Long: "Capture the network traffic in your Kubernetes cluster.",
Short: "Capture the network traffic in your Kubernetes cluster",
RunE: func(cmd *cobra.Command, args []string) error {
tap()
return nil
@@ -44,17 +43,19 @@ func init() {
log.Debug().Err(err).Send()
}
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images.")
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled.")
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward.")
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward.")
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward.")
tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector.")
tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces.")
tapCmd.Flags().StringP(configStructs.DockerRegistryLabel, "r", defaultTapConfig.Docker.Registry, "The Docker registry that's hosting the images")
tapCmd.Flags().StringP(configStructs.DockerTagLabel, "t", defaultTapConfig.Docker.Tag, "The tag of the Docker images that are going to be pulled")
tapCmd.Flags().String(configStructs.DockerImagePullPolicy, defaultTapConfig.Docker.ImagePullPolicy, "ImagePullPolicy for the Docker images")
tapCmd.Flags().StringSlice(configStructs.DockerImagePullSecrets, defaultTapConfig.Docker.ImagePullSecrets, "ImagePullSecrets for the Docker images")
tapCmd.Flags().Uint16(configStructs.ProxyFrontPortLabel, defaultTapConfig.Proxy.Front.SrcPort, "Provide a custom port for the front-end proxy/port-forward")
tapCmd.Flags().Uint16(configStructs.ProxyHubPortLabel, defaultTapConfig.Proxy.Hub.SrcPort, "Provide a custom port for the Hub proxy/port-forward")
tapCmd.Flags().String(configStructs.ProxyHostLabel, defaultTapConfig.Proxy.Host, "Provide a custom host for the proxy/port-forward")
tapCmd.Flags().StringSliceP(configStructs.NamespacesLabel, "n", defaultTapConfig.Namespaces, "Namespaces selector")
tapCmd.Flags().BoolP(configStructs.AllNamespacesLabel, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces")
tapCmd.Flags().String(configStructs.StorageLimitLabel, defaultTapConfig.StorageLimit, "Override the default storage limit. (per node)")
tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them.")
tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, fmt.Sprintf("Capture from a PCAP snapshot of %s (.tar.gz) using your Docker Daemon instead of Kubernetes.", misc.Software))
tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS.")
tapCmd.Flags().Bool(configStructs.TlsLabel, defaultTapConfig.Tls, "Capture the traffic that's encrypted with OpenSSL or Go crypto/tls libraries.")
tapCmd.Flags().Bool(configStructs.DebugLabel, defaultTapConfig.Debug, "Enable the debug mode.")
tapCmd.Flags().Bool(configStructs.DryRunLabel, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().StringP(configStructs.PcapLabel, "p", defaultTapConfig.Pcap, fmt.Sprintf("Capture from a PCAP snapshot of %s (.tar.gz) using your Docker Daemon instead of Kubernetes", misc.Software))
tapCmd.Flags().Bool(configStructs.ServiceMeshLabel, defaultTapConfig.ServiceMesh, "Capture the encrypted traffic if the cluster is configured with a service mesh and with mTLS")
tapCmd.Flags().Bool(configStructs.TlsLabel, defaultTapConfig.Tls, "Capture the traffic that's encrypted with OpenSSL or Go crypto/tls libraries")
tapCmd.Flags().Bool(configStructs.DebugLabel, defaultTapConfig.Debug, "Enable the debug mode")
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"regexp"
"sync"
"time"
"github.com/kubeshark/kubeshark/docker"
@@ -34,11 +35,18 @@ type tapState struct {
var state tapState
var connector *connect.Connector
var hubPodReady bool
var frontPodReady bool
var proxyDone bool
type Readiness struct {
Hub bool
Front bool
Proxy bool
sync.Mutex
}
var ready *Readiness
func tap() {
ready = &Readiness{}
state.startTime = time.Now()
docker.SetRegistry(config.Config.Tap.Docker.Registry)
docker.SetTag(config.Config.Tap.Docker.Tag)
@@ -71,9 +79,9 @@ func tap() {
}
}
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targetting pods in:")
log.Info().Strs("namespaces", state.targetNamespaces).Msg("Targeting pods in:")
if err := printTargettedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
if err := printTargetedPodsPreview(ctx, kubernetesProvider, state.targetNamespaces); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error listing pods!")
}
@@ -82,10 +90,13 @@ func tap() {
}
log.Info().Msg(fmt.Sprintf("Waiting for the creation of %s resources...", misc.Software))
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.Tap.Debug); err != nil {
if state.selfServiceAccountExists, err = resources.CreateHubResources(ctx, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, config.Config.Tap.Resources.Hub, config.Config.ImagePullPolicy(), config.Config.ImagePullSecrets(), config.Config.Tap.Debug); err != nil {
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) && (statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists) {
log.Warn().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance", misc.Software, misc.Program, misc.Software))
log.Info().Msg(fmt.Sprintf("%s is already running in this namespace, change the `selfnamespace` configuration or run `%s clean` to remove the currently running %s instance.", misc.Software, misc.Program, misc.Software))
postHubStarted(ctx, kubernetesProvider, cancel, true)
log.Info().Msg("Updated Hub about the changes in the config. Exiting.")
printProxyCommandSuggestion()
} else {
defer resources.CleanUpSelfResources(ctx, cancel, kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
log.Error().Err(errormessage.FormatError(err)).Msg("Error creating resources!")
@@ -102,10 +113,17 @@ func tap() {
// block until exit signal or error
utils.WaitForTermination(ctx, cancel)
printProxyCommandSuggestion()
}
func printProxyCommandSuggestion() {
log.Warn().
Str("command", fmt.Sprintf("%s proxy", misc.Program)).
Msg(fmt.Sprintf(utils.Yellow, "To re-establish a proxy/port-forward, run:"))
}
func finishTapExecution(kubernetesProvider *kubernetes.Provider) {
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace)
finishSelfExecution(kubernetesProvider, config.Config.IsNsRestrictedMode(), config.Config.SelfNamespace, true)
}
/*
@@ -113,69 +131,20 @@ This function is a bit problematic as it might be detached from the actual pods
The alternative would be to wait for Hub to be ready and then query it for the pods it listens to, this has
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
*/
func printTargettedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
func printTargetedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
return err
} else {
if len(matchingPods) == 0 {
printNoPodsFoundSuggestion(namespaces)
}
for _, targettedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("New pod: %s", fmt.Sprintf(utils.Green, targettedPod.Name)))
for _, targetedPod := range matchingPods {
log.Info().Msg(fmt.Sprintf("Targeted pod: %s", fmt.Sprintf(utils.Green, targetedPod.Name)))
}
return nil
}
}
func startWorkerSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, startTime time.Time) error {
workerSyncer, err := kubernetes.CreateAndStartWorkerSyncer(ctx, provider, kubernetes.WorkerSyncerConfig{
TargetNamespaces: targetNamespaces,
PodFilterRegex: *config.Config.Tap.PodRegex(),
SelfNamespace: config.Config.SelfNamespace,
WorkerResources: config.Config.Tap.Resources.Worker,
ImagePullPolicy: config.Config.ImagePullPolicy(),
SelfServiceAccountExists: state.selfServiceAccountExists,
ServiceMesh: config.Config.Tap.ServiceMesh,
Tls: config.Config.Tap.Tls,
Debug: config.Config.Tap.Debug,
}, startTime)
if err != nil {
return err
}
go func() {
for {
select {
case syncerErr, ok := <-workerSyncer.ErrorOut:
if !ok {
log.Debug().Msg("workerSyncer err channel closed, ending listener loop")
return
}
log.Error().Msg(getK8sTapManagerErrorText(syncerErr))
cancel()
case _, ok := <-workerSyncer.TapPodChangesOut:
if !ok {
log.Debug().Msg("workerSyncer pod changes channel closed, ending listener loop")
return
}
go connector.PostTargettedPodsToHub(workerSyncer.CurrentlyTargettedPods)
case pod, ok := <-workerSyncer.WorkerPodsChanges:
if !ok {
log.Debug().Msg("workerSyncer worker status changed channel closed, ending listener loop")
return
}
go connector.PostWorkerPodToHub(pod)
case <-ctx.Done():
log.Debug().Msg("workerSyncer event listener loop exiting due to context done")
return
}
}
}()
return nil
}
func printNoPodsFoundSuggestion(targetNamespaces []string) {
var suggestionStr string
if !utils.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
@@ -184,19 +153,6 @@ func printNoPodsFoundSuggestion(targetNamespaces []string) {
log.Warn().Msg(fmt.Sprintf("Did not find any currently running pods that match the regex argument, %s will automatically target matching pods if any are created later%s", misc.Software, suggestionStr))
}
func getK8sTapManagerErrorText(err kubernetes.K8sTapManagerError) string {
switch err.TapManagerReason {
case kubernetes.TapManagerPodListError:
return fmt.Sprintf("Failed to update currently targetted pods: %v", err.OriginalError)
case kubernetes.TapManagerPodWatchError:
return fmt.Sprintf("Error occured in K8s pod watch: %v", err.OriginalError)
case kubernetes.TapManagerWorkerUpdateError:
return fmt.Sprintf("Error updating worker: %v", err.OriginalError)
default:
return fmt.Sprintf("Unknown error occured in K8s tap manager: %v", err.OriginalError)
}
}
func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.HubPodName))
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
@@ -214,9 +170,9 @@ func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, c
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.HubPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -235,12 +191,23 @@ func watchHubPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, c
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
hubPodReady = true
postHubStarted(ctx, kubernetesProvider, cancel)
ready.Lock()
ready.Hub = true
ready.Unlock()
postHubStarted(ctx, kubernetesProvider, cancel, false)
}
ready.Lock()
proxyDone := ready.Proxy
hubPodReady := ready.Hub
frontPodReady := ready.Front
ready.Unlock()
if !proxyDone && hubPodReady && frontPodReady {
proxyDone = true
ready.Lock()
ready.Proxy = true
ready.Unlock()
postFrontStarted(ctx, kubernetesProvider, cancel)
}
case kubernetes.EventBookmark:
@@ -294,9 +261,9 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
switch wEvent.Type {
case kubernetes.EventAdded:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Added:")
case kubernetes.EventDeleted:
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed pod.")
log.Info().Str("pod", kubernetes.FrontPodName).Msg("Removed:")
cancel()
return
case kubernetes.EventModified:
@@ -315,11 +282,21 @@ func watchFrontPod(ctx context.Context, kubernetesProvider *kubernetes.Provider,
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
frontPodReady = true
ready.Lock()
ready.Front = true
ready.Unlock()
}
ready.Lock()
proxyDone := ready.Proxy
hubPodReady := ready.Hub
frontPodReady := ready.Front
ready.Unlock()
if !proxyDone && hubPodReady && frontPodReady {
proxyDone = true
ready.Lock()
ready.Proxy = true
ready.Unlock()
postFrontStarted(ctx, kubernetesProvider, cancel)
}
case kubernetes.EventBookmark:
@@ -425,20 +402,91 @@ func watchHubEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider
}
}
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.HubServiceName, configStructs.ProxyFrontPortLabel, config.Config.Tap.Proxy.Hub.SrcPort, config.Config.Tap.Proxy.Hub.DstPort, "/echo")
func postHubStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, update bool) {
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.HubServiceName,
kubernetes.HubPodName,
configStructs.ProxyHubPortLabel,
config.Config.Tap.Proxy.Hub.SrcPort,
config.Config.Tap.Proxy.Hub.DstPort,
"/echo",
)
if err := startWorkerSyncer(ctx, cancel, kubernetesProvider, state.targetNamespaces, state.startTime); err != nil {
log.Error().Err(errormessage.FormatError(err)).Msg("Error starting worker syncer")
cancel()
if !update {
// Create workers
err := kubernetes.CreateWorkers(
kubernetesProvider,
state.selfServiceAccountExists,
ctx,
config.Config.SelfNamespace,
config.Config.Tap.Resources.Worker,
config.Config.ImagePullPolicy(),
config.Config.ImagePullSecrets(),
config.Config.Tap.ServiceMesh,
config.Config.Tap.Tls,
config.Config.Tap.Debug,
)
if err != nil {
log.Error().Err(err).Send()
}
// Grace period
log.Info().Msg("Waiting for worker containers...")
time.Sleep(5 * time.Second)
}
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
// Storage limit
connector.PostStorageLimitToHub(config.Config.Tap.StorageLimitBytes())
// Pod regex
connector.PostRegexToHub(config.Config.Tap.PodRegexStr, state.targetNamespaces)
// License
if config.Config.License != "" {
connector.PostLicense(config.Config.License)
}
// Scripting
connector.PostEnv(config.Config.Scripting.Env)
scripts, err := config.Config.Scripting.GetScripts()
if err != nil {
log.Error().Err(err).Send()
}
for _, script := range scripts {
_, err = connector.PostScript(script)
if err != nil {
log.Error().Err(err).Send()
}
}
connector.PostScriptDone()
if !update {
// Hub proxy URL
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Hub.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, "Hub is available at:"))
}
if config.Config.Scripting.Source != "" && config.Config.Scripting.WatchScripts {
watchScripts(false)
}
}
func postFrontStarted(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
startProxyReportErrorIfAny(kubernetesProvider, ctx, cancel, kubernetes.FrontServiceName, configStructs.ProxyHubPortLabel, config.Config.Tap.Proxy.Front.SrcPort, config.Config.Tap.Proxy.Front.DstPort, "")
startProxyReportErrorIfAny(
kubernetesProvider,
ctx,
kubernetes.FrontServiceName,
kubernetes.FrontPodName,
configStructs.ProxyFrontPortLabel,
config.Config.Tap.Proxy.Front.SrcPort,
config.Config.Tap.Proxy.Front.DstPort,
"",
)
url := kubernetes.GetLocalhostOnPort(config.Config.Tap.Proxy.Front.SrcPort)
log.Info().Str("url", url).Msg(fmt.Sprintf(utils.Green, fmt.Sprintf("%s is available at:", misc.Software)))

View File

@@ -5,11 +5,13 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"github.com/creasty/defaults"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/misc/version"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog"
@@ -28,9 +30,10 @@ const (
)
var (
Config ConfigStruct
DebugMode bool
cmdName string
Config ConfigStruct
DebugMode bool
cmdName string
ConfigFilePath string
)
func InitConfig(cmd *cobra.Command) error {
@@ -48,7 +51,9 @@ func InitConfig(cmd *cobra.Command) error {
return nil
}
go version.CheckNewerVersion()
if cmd.Use != "console" && cmd.Use != "pro" {
go version.CheckNewerVersion()
}
Config = CreateDefaultConfig()
cmdName = cmd.Name()
@@ -92,7 +97,7 @@ func WriteConfig(config *ConfigStruct) error {
}
data := []byte(template)
if err := os.WriteFile(Config.ConfigFilePath, data, 0644); err != nil {
if err := os.WriteFile(ConfigFilePath, data, 0644); err != nil {
return fmt.Errorf("failed writing config, err: %v", err)
}
@@ -100,20 +105,32 @@ func WriteConfig(config *ConfigStruct) error {
}
func loadConfigFile(configFilePath string, config *ConfigStruct) error {
reader, openErr := os.Open(configFilePath)
if openErr != nil {
return openErr
cwd, err := os.Getwd()
if err != nil {
return err
}
buf, readErr := io.ReadAll(reader)
if readErr != nil {
return readErr
cwdConfig := filepath.Join(cwd, fmt.Sprintf("%s.yaml", misc.Program))
reader, err := os.Open(cwdConfig)
if err != nil {
reader, err = os.Open(configFilePath)
if err != nil {
return err
}
} else {
configFilePath = cwdConfig
}
buf, err := io.ReadAll(reader)
if err != nil {
return err
}
if err := yaml.Unmarshal(buf, config); err != nil {
return err
}
ConfigFilePath = configFilePath
log.Info().Str("path", configFilePath).Msg("Found config file!")
return nil

View File

@@ -27,14 +27,17 @@ type KubeConfig struct {
}
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Logs configStructs.LogsConfig `yaml:"logs"`
Config configStructs.ConfigConfig `yaml:"config,omitempty"`
Kube KubeConfig `yaml:"kube"`
SelfNamespace string `yaml:"selfnamespace" default:"kubeshark"`
DumpLogs bool `yaml:"dumplogs" default:"false"`
ConfigFilePath string `yaml:"configpath,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
Tap configStructs.TapConfig `yaml:"tap"`
Logs configStructs.LogsConfig `yaml:"logs"`
Config configStructs.ConfigConfig `yaml:"config,omitempty"`
Kube KubeConfig `yaml:"kube"`
SelfNamespace string `yaml:"selfnamespace" default:"kubeshark"`
DumpLogs bool `yaml:"dumplogs" default:"false"`
ConfigFilePath string `yaml:"configpath,omitempty" readonly:""`
HeadlessMode bool `yaml:"headless" default:"false"`
License string `yaml:"license" default:""`
Scripting configStructs.ScriptingConfig `yaml:"scripting"`
ResourceLabels map[string]string `yaml:"resourceLabels" default:"{}"`
}
func (config *ConfigStruct) SetDefaults() {
@@ -45,6 +48,15 @@ func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy {
return v1.PullPolicy(config.Tap.Docker.ImagePullPolicy)
}
func (config *ConfigStruct) ImagePullSecrets() []v1.LocalObjectReference {
var ref []v1.LocalObjectReference
for _, name := range config.Tap.Docker.ImagePullSecrets {
ref = append(ref, v1.LocalObjectReference{Name: name})
}
return ref
}
func (config *ConfigStruct) IsNsRestrictedMode() bool {
return config.SelfNamespace != misc.Program // Notice "kubeshark" string must match the default SelfNamespace
}

View File

@@ -0,0 +1,46 @@
package configStructs
import (
"io/fs"
"io/ioutil"
"path/filepath"
"github.com/kubeshark/kubeshark/misc"
"github.com/rs/zerolog/log"
)
type ScriptingConfig struct {
Env map[string]interface{} `yaml:"env"`
Source string `yaml:"source" default:""`
WatchScripts bool `yaml:"watchScripts" default:"true"`
}
func (config *ScriptingConfig) GetScripts() (scripts []*misc.Script, err error) {
if config.Source == "" {
return
}
var files []fs.FileInfo
files, err = ioutil.ReadDir(config.Source)
if err != nil {
return
}
for _, f := range files {
if f.IsDir() {
continue
}
var script *misc.Script
path := filepath.Join(config.Source, f.Name())
script, err = misc.ReadScriptFile(path)
if err != nil {
return
}
scripts = append(scripts, script)
log.Debug().Str("path", path).Msg("Found script:")
}
return
}

View File

@@ -4,40 +4,48 @@ import (
"fmt"
"regexp"
"github.com/kubeshark/kubeshark/kubernetes"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
)
const (
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
DockerRegistryLabel = "docker-registry"
DockerTagLabel = "docker-tag"
DockerImagePullPolicy = "docker-imagepullpolicy"
DockerImagePullSecrets = "docker-imagepullsecrets"
ProxyFrontPortLabel = "proxy-front-port"
ProxyHubPortLabel = "proxy-hub-port"
ProxyHostLabel = "proxy-host"
NamespacesLabel = "namespaces"
AllNamespacesLabel = "allnamespaces"
StorageLimitLabel = "storagelimit"
DryRunLabel = "dryrun"
PcapLabel = "pcap"
ServiceMeshLabel = "servicemesh"
TlsLabel = "tls"
DebugLabel = "debug"
)
type Resources struct {
CpuLimit string `yaml:"cpu-limit" default:"750m"`
MemoryLimit string `yaml:"memory-limit" default:"1Gi"`
CpuRequests string `yaml:"cpu-requests" default:"50m"`
MemoryRequests string `yaml:"memory-requests" default:"50Mi"`
}
type WorkerConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8897"`
DstPort uint16 `yaml:"dst-port" default:"8897"`
SrcPort uint16 `yaml:"port" default:"8897"`
DstPort uint16 `yaml:"srvport" default:"8897"`
}
type HubConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8898"`
DstPort uint16 `yaml:"dst-port" default:"8898"`
SrcPort uint16 `yaml:"port" default:"8898"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type FrontConfig struct {
SrcPort uint16 `yaml:"src-port" default:"8899"`
DstPort uint16 `yaml:"dst-port" default:"80"`
SrcPort uint16 `yaml:"port" default:"8899"`
DstPort uint16 `yaml:"srvport" default:"80"`
}
type ProxyConfig struct {
@@ -48,14 +56,15 @@ type ProxyConfig struct {
}
type DockerConfig struct {
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
Registry string `yaml:"registry" default:"docker.io/kubeshark"`
Tag string `yaml:"tag" default:"latest"`
ImagePullPolicy string `yaml:"imagepullpolicy" default:"Always"`
ImagePullSecrets []string `yaml:"imagepullsecrets"`
}
type ResourcesConfig struct {
Worker kubernetes.Resources `yaml:"worker"`
Hub kubernetes.Resources `yaml:"hub"`
Worker Resources `yaml:"worker"`
Hub Resources `yaml:"hub"`
}
type TapConfig struct {

View File

@@ -1,6 +1,9 @@
package docker
import "fmt"
import (
"fmt"
"strings"
)
const (
hub = "hub"
@@ -9,7 +12,7 @@ const (
)
var (
registry = "docker.io/kubeshark"
registry = "docker.io/kubeshark/"
tag = "latest"
)
@@ -18,7 +21,11 @@ func GetRegistry() string {
}
func SetRegistry(value string) {
registry = value
if strings.HasPrefix(value, "docker.io/kubeshark") {
registry = "docker.io/kubeshark/"
} else {
registry = value
}
}
func GetTag() string {
@@ -30,7 +37,7 @@ func SetTag(value string) {
}
func getImage(image string) string {
return fmt.Sprintf("%s/%s:%s", registry, image, tag)
return fmt.Sprintf("%s%s:%s", registry, image, tag)
}
func GetHubImage() string {

View File

@@ -18,7 +18,7 @@ func FormatError(err error) error {
if k8serrors.IsForbidden(err) {
errorNew = fmt.Errorf("insufficient permissions: %w. "+
"supply the required permission or control %s's access to namespaces by setting %s "+
"in the config file or setting the targetted namespace with --%s %s=<NAMEPSACE>",
"in the config file or setting the targeted namespace with --%s %s=<NAMEPSACE>",
err,
misc.Software,
config.SelfNamespaceConfigName,

13
go.mod
View File

@@ -7,8 +7,12 @@ require (
github.com/docker/docker v20.10.22+incompatible
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
github.com/fsnotify/fsnotify v1.5.1
github.com/gin-gonic/gin v1.7.7
github.com/google/go-github/v37 v37.0.0
github.com/kubeshark/base v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/kubeshark/base v0.6.3
github.com/robertkrimen/otto v0.2.1
github.com/rs/zerolog v1.28.0
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
@@ -38,11 +42,15 @@ require (
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/fvbommel/sortorder v1.0.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-errors/errors v1.4.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
@@ -58,6 +66,7 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
@@ -78,6 +87,7 @@ require (
github.com/russross/blackfriday v1.6.0 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20220203230714-bb14e151c28f // indirect
golang.org/x/crypto v0.0.0-20220208050332-20e1d8d225ab // indirect
@@ -92,6 +102,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/cli-runtime v0.23.3 // indirect
k8s.io/component-base v0.23.3 // indirect

26
go.sum
View File

@@ -196,6 +196,10 @@ github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM4
github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
@@ -225,6 +229,14 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-openapi/swag v0.21.1 h1:wm0rhTb5z7qpJRHBdPOMuY4QjVUMbF6/kwoYeRAOrKU=
github.com/go-openapi/swag v0.21.1/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE=
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@@ -331,6 +343,7 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
@@ -414,8 +427,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubeshark/base v0.5.0 h1:ZQ/wNIdmLeDwy143korRZyPorcqBgf1y/tQIiIenAL0=
github.com/kubeshark/base v0.5.0/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/kubeshark/base v0.6.3 h1:4MpADgqG51cM4unGSe5J8HBHy9PSvzpw/kcUO5cHMG4=
github.com/kubeshark/base v0.6.3/go.mod h1:/ZzBY+5KLaC7J6QUVXtZ0HZALhMcEDrU6Waux5/bHQc=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
@@ -543,6 +558,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0=
github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -610,6 +627,9 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=
@@ -1135,6 +1155,8 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=

View File

@@ -3,16 +3,16 @@ package connect
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"time"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)
@@ -24,6 +24,7 @@ type Connector struct {
const DefaultRetries = 3
const DefaultTimeout = 2 * time.Second
const DefaultSleep = 1 * time.Second
func NewConnector(url string, retries int, timeout time.Duration) *Connector {
return &Connector{
@@ -45,7 +46,7 @@ func (connector *Connector) TestConnection(path string) error {
break
}
retriesLeft -= 1
time.Sleep(time.Second)
time.Sleep(5 * DefaultSleep)
}
if retriesLeft == 0 {
@@ -64,7 +65,6 @@ func (connector *Connector) isReachable(path string) (bool, error) {
}
func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
// TODO: This request is responsible for proxy_server.go:147] Error while proxying request: context canceled log
postWorkerUrl := fmt.Sprintf("%s/pods/worker", connector.url)
if podMarshalled, err := json.Marshal(pod); err != nil {
@@ -72,17 +72,17 @@ func (connector *Connector) PostWorkerPodToHub(pod *v1.Pod) {
} else {
ok := false
for !ok {
if _, err = utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil {
var resp *http.Response
if resp, err = utils.Post(postWorkerUrl, "application/json", bytes.NewBuffer(podMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the Worker pod to Hub:")
log.Warn().Err(err).Msg("Failed sending the Worker pod to Hub. Retrying...")
} else {
ok = true
log.Debug().Interface("worker-pod", pod).Msg("Reported worker pod to Hub:")
connector.PostStorageLimitToHub(config.Config.Tap.StorageLimitBytes())
return
}
time.Sleep(time.Second)
time.Sleep(DefaultSleep)
}
}
}
@@ -102,38 +102,248 @@ func (connector *Connector) PostStorageLimitToHub(limit int64) {
} else {
ok := false
for !ok {
if _, err = utils.Post(postStorageLimitUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil {
var resp *http.Response
if resp, err = utils.Post(postStorageLimitUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the storage limit to Hub:")
log.Warn().Err(err).Msg("Failed sending the storage limit to Hub. Retrying...")
} else {
ok = true
log.Debug().Int("limit", int(limit)).Msg("Reported storage limit to Hub:")
return
}
time.Sleep(time.Second)
time.Sleep(DefaultSleep)
}
}
}
func (connector *Connector) PostTargettedPodsToHub(pods []core.Pod) {
postTargettedUrl := fmt.Sprintf("%s/pods/targetted", connector.url)
type postRegexRequest struct {
Regex string `json:"regex"`
Namespaces []string `json:"namespaces"`
}
if podsMarshalled, err := json.Marshal(pods); err != nil {
log.Error().Err(err).Msg("Failed to marshal the targetted pods:")
func (connector *Connector) PostRegexToHub(regex string, namespaces []string) {
postRegexUrl := fmt.Sprintf("%s/pods/regex", connector.url)
payload := postRegexRequest{
Regex: regex,
Namespaces: namespaces,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the pod regex:")
} else {
ok := false
for !ok {
if _, err = utils.Post(postTargettedUrl, "application/json", bytes.NewBuffer(podsMarshalled), connector.client); err != nil {
var resp *http.Response
if resp, err = utils.Post(postRegexUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Debug().Err(err).Msg("Failed sending the targetted pods to Hub:")
log.Warn().Err(err).Msg("Failed sending the pod regex to Hub. Retrying...")
} else {
ok = true
log.Debug().Int("pod-count", len(pods)).Msg("Reported targetted pods to Hub:")
log.Debug().Str("regex", regex).Strs("namespaces", namespaces).Msg("Reported pod regex to Hub:")
return
}
time.Sleep(time.Second)
time.Sleep(DefaultSleep)
}
}
}
type postLicenseRequest struct {
License string `json:"license"`
}
func (connector *Connector) PostLicense(license string) {
postLicenseUrl := fmt.Sprintf("%s/license", connector.url)
payload := postLicenseRequest{
License: license,
}
if payloadMarshalled, err := json.Marshal(payload); err != nil {
log.Error().Err(err).Msg("Failed to marshal the payload:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postLicenseUrl, "application/json", bytes.NewBuffer(payloadMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed sending the license to Hub. Retrying...")
} else {
log.Debug().Str("license", license).Msg("Reported license to Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
}
func (connector *Connector) PostEnv(env map[string]interface{}) {
if len(env) == 0 {
return
}
postEnvUrl := fmt.Sprintf("%s/scripts/env", connector.url)
if envMarshalled, err := json.Marshal(env); err != nil {
log.Error().Err(err).Msg("Failed to marshal the env:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postEnvUrl, "application/json", bytes.NewBuffer(envMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed sending the scripting environment variables to Hub. Retrying...")
} else {
log.Debug().Interface("env", env).Msg("Reported scripting environment variables to Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
}
func (connector *Connector) PostScript(script *misc.Script) (index int64, err error) {
postScriptUrl := fmt.Sprintf("%s/scripts", connector.url)
var scriptMarshalled []byte
if scriptMarshalled, err = json.Marshal(script); err != nil {
log.Error().Err(err).Msg("Failed to marshal the script:")
} else {
ok := false
for !ok {
var resp *http.Response
if resp, err = utils.Post(postScriptUrl, "application/json", bytes.NewBuffer(scriptMarshalled), connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed creating script Hub:")
} else {
var j map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&j)
if err != nil {
return
}
val, ok := j["index"]
if !ok {
err = errors.New("Response does not contain `key` field!")
return
}
index = int64(val.(float64))
log.Debug().Int("index", int(index)).Interface("script", script).Msg("Created script on Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
return
}
func (connector *Connector) PutScript(script *misc.Script, index int64) (err error) {
putScriptUrl := fmt.Sprintf("%s/scripts/%d", connector.url, index)
var scriptMarshalled []byte
if scriptMarshalled, err = json.Marshal(script); err != nil {
log.Error().Err(err).Msg("Failed to marshal the script:")
} else {
ok := false
for !ok {
client := &http.Client{}
var req *http.Request
req, err = http.NewRequest(http.MethodPut, putScriptUrl, bytes.NewBuffer(scriptMarshalled))
if err != nil {
log.Error().Err(err).Send()
return
}
req.Header.Set("Content-Type", "application/json")
var resp *http.Response
resp, err = client.Do(req)
if err != nil {
log.Error().Err(err).Send()
return
}
if resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed updating script on Hub:")
} else {
log.Debug().Int("index", int(index)).Interface("script", script).Msg("Updated script on Hub:")
return
}
time.Sleep(DefaultSleep)
}
}
return
}
func (connector *Connector) DeleteScript(index int64) (err error) {
deleteScriptUrl := fmt.Sprintf("%s/scripts/%d", connector.url, index)
ok := false
for !ok {
client := &http.Client{}
var req *http.Request
req, err = http.NewRequest(http.MethodDelete, deleteScriptUrl, nil)
if err != nil {
log.Error().Err(err).Send()
return
}
req.Header.Set("Content-Type", "application/json")
var resp *http.Response
resp, err = client.Do(req)
if err != nil {
log.Error().Err(err).Send()
return
}
if resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed deleting script on Hub:")
} else {
log.Debug().Int("index", int(index)).Msg("Deleted script on Hub:")
return
}
time.Sleep(DefaultSleep)
}
return
}
func (connector *Connector) PostScriptDone() {
postScripDonetUrl := fmt.Sprintf("%s/scripts/done", connector.url)
ok := false
var err error
for !ok {
var resp *http.Response
if resp, err = utils.Post(postScripDonetUrl, "application/json", nil, connector.client); err != nil || resp.StatusCode != http.StatusOK {
if _, ok := err.(*url.Error); ok {
break
}
log.Warn().Err(err).Msg("Failed sending the POST scripts done to Hub. Retrying...")
} else {
log.Debug().Msg("Reported POST scripts done to Hub.")
return
}
time.Sleep(DefaultSleep)
}
}

View File

@@ -14,7 +14,6 @@ const (
ServiceAccountName = SelfResourcesPrefix + "service-account"
WorkerDaemonSetName = SelfResourcesPrefix + "worker-daemon-set"
WorkerPodName = SelfResourcesPrefix + "worker"
ConfigMapName = SelfResourcesPrefix + "config"
MinKubernetesServerVersion = "1.16.0"
)

View File

@@ -10,6 +10,7 @@ import (
"path/filepath"
"regexp"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/semver"
@@ -26,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
v1 "k8s.io/client-go/applyconfigurations/core/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
@@ -160,11 +162,8 @@ func (provider *Provider) WaitUtilNamespaceDeleted(ctx context.Context, name str
func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*core.Namespace, error) {
namespaceSpec := &core.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
Name: name,
Labels: buildWithDefaultLabels(map[string]string{}, provider),
},
}
return provider.clientSet.CoreV1().Namespaces().Create(ctx, namespaceSpec, metav1.CreateOptions{})
@@ -175,15 +174,13 @@ type PodOptions struct {
PodName string
PodImage string
ServiceAccountName string
Resources Resources
Resources configStructs.Resources
ImagePullPolicy core.PullPolicy
ImagePullSecrets []core.LocalObjectReference
Debug bool
}
func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -231,11 +228,9 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: opts.PodName,
Labels: map[string]string{
"app": opts.PodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
Labels: buildWithDefaultLabels(map[string]string{
"app": opts.PodName,
}, provider),
},
Spec: core.PodSpec{
Containers: containers,
@@ -251,6 +246,7 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -262,9 +258,6 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) {
}
func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPort string) (*core.Pod, error) {
configMapVolume := &core.ConfigMapVolumeSource{}
configMapVolume.Name = ConfigMapName
cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit)
if err != nil {
return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName)
@@ -315,7 +308,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Env: []core.EnvVar{
{
Name: "REACT_APP_DEFAULT_FILTER",
Value: "timestamp >= now()",
Value: " ",
},
{
Name: "REACT_APP_HUB_HOST",
@@ -332,11 +325,9 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: opts.PodName,
Labels: map[string]string{
"app": opts.PodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
Labels: buildWithDefaultLabels(map[string]string{
"app": opts.PodName,
}, provider),
},
Spec: core.PodSpec{
Containers: containers,
@@ -353,6 +344,7 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor
Effect: core.TaintEffectNoSchedule,
},
},
ImagePullSecrets: opts.ImagePullSecrets,
},
}
@@ -370,11 +362,8 @@ func (provider *Provider) CreatePod(ctx context.Context, namespace string, podSp
func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string, targetPort int, port int32) (*core.Service, error) {
service := core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Labels: map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
Name: serviceName,
Labels: buildWithDefaultLabels(map[string]string{}, provider),
},
Spec: core.ServiceSpec{
Ports: []core.ServicePort{
@@ -416,11 +405,6 @@ func (provider *Provider) DoesNamespaceExist(ctx context.Context, name string) (
return provider.doesResourceExist(namespaceResource, err)
}
func (provider *Provider) DoesConfigMapExist(ctx context.Context, namespace string, name string) (bool, error) {
configMapResource, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(configMapResource, err)
}
func (provider *Provider) DoesServiceAccountExist(ctx context.Context, namespace string, name string) (bool, error) {
serviceAccountResource, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, name, metav1.GetOptions{})
return provider.doesResourceExist(serviceAccountResource, err)
@@ -468,21 +452,17 @@ func (provider *Provider) CreateSelfRBAC(ctx context.Context, namespace string,
serviceAccount := &core.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: serviceAccountName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
}
clusterRole := &rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
Rules: []rbac.PolicyRule{
{
@@ -495,11 +475,9 @@ func (provider *Provider) CreateSelfRBAC(ctx context.Context, namespace string,
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleBindingName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
RoleRef: rbac.RoleRef{
Name: clusterRoleName,
@@ -533,21 +511,17 @@ func (provider *Provider) CreateSelfRBACNamespaceRestricted(ctx context.Context,
serviceAccount := &core.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: serviceAccountName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
}
role := &rbac.Role{
ObjectMeta: metav1.ObjectMeta{
Name: roleName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
Rules: []rbac.PolicyRule{
{
@@ -560,11 +534,9 @@ func (provider *Provider) CreateSelfRBACNamespaceRestricted(ctx context.Context,
roleBinding := &rbac.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: roleBindingName,
Labels: map[string]string{
Labels: buildWithDefaultLabels(map[string]string{
fmt.Sprintf("%s-cli-version", misc.Program): version,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
},
}, provider),
},
RoleRef: rbac.RoleRef{
Name: roleName,
@@ -660,26 +632,21 @@ func (provider *Provider) ApplyWorkerDaemonSet(
daemonSetName string,
podImage string,
workerPodName string,
nodeNames []string,
serviceAccountName string,
resources Resources,
resources configStructs.Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
log.Debug().
Int("node-count", len(nodeNames)).
Str("namespace", namespace).
Str("daemonset-name", daemonSetName).
Str("image", podImage).
Str("pod", workerPodName).
Msg("Applying worker DaemonSets.")
if len(nodeNames) == 0 {
return fmt.Errorf("DaemonSet %s must target at least 1 pod", daemonSetName)
}
command := []string{"./worker", "-i", "any", "-port", "8897"}
if debug {
@@ -724,14 +691,19 @@ func (provider *Provider) ApplyWorkerDaemonSet(
workerContainer.WithCommand(command...)
var envvars []*v1.EnvVarApplyConfiguration
// Worker build with -race flag requires the GODEBUG=netdns=go
// envvars = append(envvars, applyconfcore.EnvVar().WithName("GODEBUG").WithValue("netdns=go"))
if debug {
workerContainer.WithEnv(
applyconfcore.EnvVar().WithName("MEMORY_PROFILING_ENABLED").WithValue("true"),
applyconfcore.EnvVar().WithName("MEMORY_PROFILING_INTERVAL_SECONDS").WithValue("10"),
applyconfcore.EnvVar().WithName("MEMORY_USAGE_INTERVAL_MILLISECONDS").WithValue("500"),
)
envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_PROFILING_ENABLED").WithValue("true"))
envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_PROFILING_INTERVAL_SECONDS").WithValue("10"))
envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_USAGE_INTERVAL_MILLISECONDS").WithValue("500"))
}
workerContainer.WithEnv(envvars...)
cpuLimit, err := resource.ParseQuantity(resources.CpuLimit)
if err != nil {
return fmt.Errorf("invalid cpu limit for %s container", workerPodName)
@@ -759,22 +731,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits)
workerContainer.WithResources(workerResources)
matchFields := make([]*applyconfcore.NodeSelectorTermApplyConfiguration, 0)
for _, nodeName := range nodeNames {
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("metadata.name")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeName)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement)
matchFields = append(matchFields, nodeSelectorTerm)
}
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(matchFields...)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
@@ -812,12 +769,18 @@ func (provider *Provider) ApplyWorkerDaemonSet(
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podSpec.WithVolumes(procfsVolume, sysfsVolume)
if len(imagePullSecrets) > 0 {
localObjectReference := applyconfcore.LocalObjectReference()
for _, secret := range imagePullSecrets {
localObjectReference.WithName(secret.Name)
}
podSpec.WithImagePullSecrets(localObjectReference)
}
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": workerPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithLabels(buildWithDefaultLabels(map[string]string{
"app": workerPodName,
}, provider))
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
@@ -830,10 +793,7 @@ func (provider *Provider) ApplyWorkerDaemonSet(
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
}).
WithLabels(buildWithDefaultLabels(map[string]string{}, provider)).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)
@@ -862,11 +822,9 @@ func (provider *Provider) ResetWorkerDaemonSet(ctx context.Context, namespace st
podSpec.WithAffinity(affinity)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{
"app": workerPodName,
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
})
podTemplate.WithLabels(buildWithDefaultLabels(map[string]string{
"app": workerPodName,
}, provider))
podTemplate.WithSpec(podSpec)
labelSelector := applyconfmeta.LabelSelector()
@@ -879,10 +837,7 @@ func (provider *Provider) ResetWorkerDaemonSet(ctx context.Context, namespace st
daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace)
daemonSet.
WithLabels(map[string]string{
LabelManagedBy: provider.managedBy,
LabelCreatedBy: provider.createdBy,
}).
WithLabels(buildWithDefaultLabels(map[string]string{}, provider)).
WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate))
_, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions)

View File

@@ -21,7 +21,7 @@ import (
const k8sProxyApiPrefix = "/"
const selfServicePort = 80
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string, cancel context.CancelFunc) (*http.Server, error) {
func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16, selfNamespace string, selfServiceName string) (*http.Server, error) {
log.Info().
Str("namespace", selfNamespace).
Str("service", selfServiceName).
@@ -55,7 +55,7 @@ func StartProxy(kubernetesProvider *Provider, proxyHost string, srcPort uint16,
go func() {
if err := server.Serve(l); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("While creating proxy!")
cancel()
return
}
}()
@@ -99,7 +99,7 @@ func getRerouteHttpHandlerSelfStatic(proxyHandler http.Handler, selfNamespace st
})
}
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context, cancel context.CancelFunc) (*portforward.PortForwarder, error) {
func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *regexp.Regexp, srcPort uint16, dstPort uint16, ctx context.Context) (*portforward.PortForwarder, error) {
pods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, podRegex, []string{namespace})
if err != nil {
return nil, err
@@ -132,7 +132,7 @@ func NewPortForward(kubernetesProvider *Provider, namespace string, podRegex *re
go func() {
if err = forwarder.ForwardPorts(); err != nil {
log.Error().Err(err).Msg("While Kubernetes port-forwarding!")
cancel()
return
}
}()

View File

@@ -1,8 +0,0 @@
package kubernetes
type Resources struct {
CpuLimit string `yaml:"cpu-limit" default:"750m"`
MemoryLimit string `yaml:"memory-limit" default:"1Gi"`
CpuRequests string `yaml:"cpu-requests" default:"50m"`
MemoryRequests string `yaml:"memory-requests" default:"50Mi"`
}

View File

@@ -1,26 +1,25 @@
package kubernetes
import (
"regexp"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/config"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func GetNodeHostToTargettedPodsMap(targettedPods []core.Pod) models.NodeToPodsMap {
nodeToTargettedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targettedPods {
func GetNodeHostToTargetedPodsMap(targetedPods []core.Pod) models.NodeToPodsMap {
nodeToTargetedPodsMap := make(models.NodeToPodsMap)
for _, pod := range targetedPods {
minimizedPod := getMinimizedPod(pod)
existingList := nodeToTargettedPodsMap[pod.Spec.NodeName]
existingList := nodeToTargetedPodsMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTargettedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
nodeToTargetedPodsMap[pod.Spec.NodeName] = []core.Pod{minimizedPod}
} else {
nodeToTargettedPodsMap[pod.Spec.NodeName] = append(nodeToTargettedPodsMap[pod.Spec.NodeName], minimizedPod)
nodeToTargetedPodsMap[pod.Spec.NodeName] = append(nodeToTargetedPodsMap[pod.Spec.NodeName], minimizedPod)
}
}
return nodeToTargettedPodsMap
return nodeToTargetedPodsMap
}
func getMinimizedPod(fullPod core.Pod) core.Pod {
@@ -48,44 +47,6 @@ func getMinimizedContainerStatuses(fullPod core.Pod) []core.ContainerStatus {
return result
}
func excludeSelfPods(pods []core.Pod) []core.Pod {
selfPrefixRegex := regexp.MustCompile("^" + SelfResourcesPrefix)
nonSelfPods := make([]core.Pod, 0)
for _, pod := range pods {
if !selfPrefixRegex.MatchString(pod.Name) {
nonSelfPods = append(nonSelfPods, pod)
}
}
return nonSelfPods
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
added = getMissingPods(newPods, oldPods)
removed = getMissingPods(oldPods, newPods)
return added, removed
}
//returns pods present in pods1 array and missing in pods2 array
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
missingPods := make([]core.Pod, 0)
for _, pod1 := range pods1 {
var found = false
for _, pod2 := range pods2 {
if pod1.UID == pod2.UID {
found = true
break
}
}
if !found {
missingPods = append(missingPods, pod1)
}
}
return missingPods
}
func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
podInfos := make([]*models.PodInfo, 0)
for _, pod := range pods {
@@ -93,3 +54,14 @@ func GetPodInfosForPods(pods []core.Pod) []*models.PodInfo {
}
return podInfos
}
func buildWithDefaultLabels(labels map[string]string, provider *Provider) map[string]string {
labels["LabelManagedBy"] = provider.managedBy
labels["LabelCreatedBy"] = provider.createdBy
for k, v := range config.Config.ResourceLabels {
labels[k] = v
}
return labels
}

View File

@@ -1,389 +0,0 @@
package kubernetes
import (
"context"
"fmt"
"regexp"
"time"
"github.com/kubeshark/base/pkg/models"
"github.com/kubeshark/kubeshark/debounce"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/misc"
"github.com/kubeshark/kubeshark/utils"
"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
)
const updateWorkersDelay = 5 * time.Second
type TargettedPodChangeEvent struct {
Added []v1.Pod
Removed []v1.Pod
}
// WorkerSyncer uses a k8s pod watch to update Worker daemonsets when targeted pods are removed or created
type WorkerSyncer struct {
startTime time.Time
context context.Context
CurrentlyTargettedPods []v1.Pod
config WorkerSyncerConfig
kubernetesProvider *Provider
TapPodChangesOut chan TargettedPodChangeEvent
WorkerPodsChanges chan *v1.Pod
ErrorOut chan K8sTapManagerError
nodeToTargettedPodMap models.NodeToPodsMap
targettedNodes []string
}
type WorkerSyncerConfig struct {
TargetNamespaces []string
PodFilterRegex regexp.Regexp
SelfNamespace string
WorkerResources Resources
ImagePullPolicy v1.PullPolicy
SelfServiceAccountExists bool
ServiceMesh bool
Tls bool
Debug bool
}
func CreateAndStartWorkerSyncer(ctx context.Context, kubernetesProvider *Provider, config WorkerSyncerConfig, startTime time.Time) (*WorkerSyncer, error) {
syncer := &WorkerSyncer{
startTime: startTime.Truncate(time.Second), // Round down because k8s CreationTimestamp is given in 1 sec resolution.
context: ctx,
CurrentlyTargettedPods: make([]v1.Pod, 0),
config: config,
kubernetesProvider: kubernetesProvider,
TapPodChangesOut: make(chan TargettedPodChangeEvent, 100),
WorkerPodsChanges: make(chan *v1.Pod, 100),
ErrorOut: make(chan K8sTapManagerError, 100),
}
if err, _ := syncer.updateCurrentlyTargettedPods(); err != nil {
return nil, err
}
if err := syncer.updateWorkers(); err != nil {
return nil, err
}
go syncer.watchPodsForTargetting()
go syncer.watchWorkerEvents()
go syncer.watchWorkerPods()
return syncer, nil
}
func (workerSyncer *WorkerSyncer) watchWorkerPods() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, []string{workerSyncer.config.SelfNamespace}, podWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
log.Error().Str("pod", WorkerPodName).Err(err).Msg(fmt.Sprintf("While parsing %s resource!", misc.Software))
continue
}
log.Debug().
Str("pod", pod.Name).
Str("node", pod.Spec.NodeName).
Interface("phase", pod.Status.Phase).
Msg("Watching pod events...")
if pod.Spec.NodeName != "" {
workerSyncer.WorkerPodsChanges <- pod
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().Str("pod", WorkerPodName).Err(err).Msg("While watching pod!")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchWorkerEvents() {
selfResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", WorkerPodName))
eventWatchHelper := NewEventWatchHelper(workerSyncer.kubernetesProvider, selfResourceRegex, "pod")
eventChan, errorChan := FilteredWatch(workerSyncer.context, eventWatchHelper, []string{workerSyncer.config.SelfNamespace}, eventWatchHelper)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
event, err := wEvent.ToEvent()
if err != nil {
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("Parsing resource event.")
continue
}
log.Debug().
Str("pod", WorkerPodName).
Str("event", event.Name).
Time("time", event.CreationTimestamp.Time).
Str("name", event.Regarding.Name).
Str("kind", event.Regarding.Kind).
Str("reason", event.Reason).
Str("note", event.Note).
Msg("Watching events.")
pod, err1 := workerSyncer.kubernetesProvider.GetPod(workerSyncer.context, workerSyncer.config.SelfNamespace, event.Regarding.Name)
if err1 != nil {
log.Error().Str("name", event.Regarding.Name).Msg("Couldn't get pod")
continue
}
workerSyncer.WorkerPodsChanges <- pod
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
log.Error().
Str("pod", WorkerPodName).
Err(err).
Msg("While watching events.")
case <-workerSyncer.context.Done():
log.Debug().
Str("pod", WorkerPodName).
Msg("Watching pod events, context done.")
return
}
}
}
func (workerSyncer *WorkerSyncer) watchPodsForTargetting() {
podWatchHelper := NewPodWatchHelper(workerSyncer.kubernetesProvider, &workerSyncer.config.PodFilterRegex)
eventChan, errorChan := FilteredWatch(workerSyncer.context, podWatchHelper, workerSyncer.config.TargetNamespaces, podWatchHelper)
handleChangeInPods := func() {
err, changeFound := workerSyncer.updateCurrentlyTargettedPods()
if err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodListError,
}
}
if !changeFound {
log.Debug().Msg("Nothing changed. Updating workers is not needed.")
return
}
if err := workerSyncer.updateWorkers(); err != nil {
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerWorkerUpdateError,
}
}
}
restartWorkersDebouncer := debounce.NewDebouncer(updateWorkersDelay, handleChangeInPods)
for {
select {
case wEvent, ok := <-eventChan:
if !ok {
eventChan = nil
continue
}
pod, err := wEvent.ToPod()
if err != nil {
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
}
switch wEvent.Type {
case EventAdded:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Added matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventDeleted:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Msg("Removed matching pod.")
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
case EventModified:
log.Debug().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Str("ip", pod.Status.PodIP).
Interface("phase", pod.Status.Phase).
Msg("Modified matching pod.")
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if pod.Status.PodIP != "" {
if err := restartWorkersDebouncer.SetOn(); err != nil {
log.Error().
Str("pod", pod.Name).
Str("namespace", pod.Namespace).
Err(err).
Msg("While restarting workers!")
}
}
case EventBookmark:
break
case EventError:
break
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
continue
}
workerSyncer.handleErrorInWatchLoop(err, restartWorkersDebouncer)
continue
case <-workerSyncer.context.Done():
log.Debug().Msg("Watching pods, context done. Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
// TODO: Does this also perform cleanup?
return
}
}
}
func (workerSyncer *WorkerSyncer) handleErrorInWatchLoop(err error, restartWorkersDebouncer *debounce.Debouncer) {
log.Error().Err(err).Msg("While watching pods, got an error! Stopping \"restart workers debouncer\"")
restartWorkersDebouncer.Cancel()
workerSyncer.ErrorOut <- K8sTapManagerError{
OriginalError: err,
TapManagerReason: TapManagerPodWatchError,
}
}
func (workerSyncer *WorkerSyncer) updateCurrentlyTargettedPods() (err error, changesFound bool) {
if matchingPods, err := workerSyncer.kubernetesProvider.ListAllRunningPodsMatchingRegex(workerSyncer.context, &workerSyncer.config.PodFilterRegex, workerSyncer.config.TargetNamespaces); err != nil {
return err, false
} else {
podsToTarget := excludeSelfPods(matchingPods)
addedPods, removedPods := getPodArrayDiff(workerSyncer.CurrentlyTargettedPods, podsToTarget)
for _, addedPod := range addedPods {
log.Info().Str("pod", addedPod.Name).Msg("Currently targetting:")
}
for _, removedPod := range removedPods {
log.Info().Str("pod", removedPod.Name).Msg("Pod is no longer running. Targetting is stopped.")
}
if len(addedPods) > 0 || len(removedPods) > 0 {
workerSyncer.CurrentlyTargettedPods = podsToTarget
workerSyncer.nodeToTargettedPodMap = GetNodeHostToTargettedPodsMap(workerSyncer.CurrentlyTargettedPods)
workerSyncer.TapPodChangesOut <- TargettedPodChangeEvent{
Added: addedPods,
Removed: removedPods,
}
return nil, true
}
return nil, false
}
}
func (workerSyncer *WorkerSyncer) updateWorkers() error {
nodesToTarget := make([]string, len(workerSyncer.nodeToTargettedPodMap))
i := 0
for node := range workerSyncer.nodeToTargettedPodMap {
nodesToTarget[i] = node
i++
}
if utils.EqualStringSlices(nodesToTarget, workerSyncer.targettedNodes) {
log.Debug().Msg("Skipping apply, DaemonSet is up to date")
return nil
}
log.Debug().Strs("nodes", nodesToTarget).Msg("Updating DaemonSet to run on nodes.")
image := docker.GetWorkerImage()
if len(workerSyncer.nodeToTargettedPodMap) > 0 {
var serviceAccountName string
if workerSyncer.config.SelfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
nodeNames := make([]string, 0, len(workerSyncer.nodeToTargettedPodMap))
for nodeName := range workerSyncer.nodeToTargettedPodMap {
nodeNames = append(nodeNames, nodeName)
}
if err := workerSyncer.kubernetesProvider.ApplyWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName,
nodeNames,
serviceAccountName,
workerSyncer.config.WorkerResources,
workerSyncer.config.ImagePullPolicy,
workerSyncer.config.ServiceMesh,
workerSyncer.config.Tls,
workerSyncer.config.Debug); err != nil {
return err
}
log.Debug().Int("worker-count", len(workerSyncer.nodeToTargettedPodMap)).Msg("Successfully created workers.")
} else {
if err := workerSyncer.kubernetesProvider.ResetWorkerDaemonSet(
workerSyncer.context,
workerSyncer.config.SelfNamespace,
WorkerDaemonSetName,
image,
WorkerPodName); err != nil {
return err
}
log.Debug().Msg("Successfully resetted Worker DaemonSet")
}
workerSyncer.targettedNodes = nodesToTarget
return nil
}

55
kubernetes/workers.go Normal file
View File

@@ -0,0 +1,55 @@
package kubernetes
import (
"context"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/docker"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
)
func CreateWorkers(
kubernetesProvider *Provider,
selfServiceAccountExists bool,
ctx context.Context,
namespace string,
resources configStructs.Resources,
imagePullPolicy core.PullPolicy,
imagePullSecrets []core.LocalObjectReference,
serviceMesh bool,
tls bool,
debug bool,
) error {
image := docker.GetWorkerImage()
var serviceAccountName string
if selfServiceAccountExists {
serviceAccountName = ServiceAccountName
} else {
serviceAccountName = ""
}
log.Info().Msg("Creating the worker DaemonSet...")
if err := kubernetesProvider.ApplyWorkerDaemonSet(
ctx,
namespace,
WorkerDaemonSetName,
image,
WorkerPodName,
serviceAccountName,
resources,
imagePullPolicy,
imagePullSecrets,
serviceMesh,
tls,
debug,
); err != nil {
return err
}
log.Info().Msg("Successfully created the worker DaemonSet.")
return nil
}

View File

@@ -2,6 +2,7 @@ package main
import (
"os"
"strconv"
"time"
"github.com/kubeshark/kubeshark/cmd"
@@ -11,6 +12,20 @@ import (
func main() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
// Short caller (file:line)
zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string {
short := file
for i := len(file) - 1; i > 0; i-- {
if file[i] == '/' {
short = file[i+1:]
break
}
}
file = short
return file + ":" + strconv.Itoa(line)
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339}).With().Caller().Logger()
cmd.Execute()
}

55
misc/scripting.go Normal file
View File

@@ -0,0 +1,55 @@
package misc
import (
"os"
"path/filepath"
"github.com/robertkrimen/otto/ast"
"github.com/robertkrimen/otto/file"
"github.com/robertkrimen/otto/parser"
)
type Script struct {
Path string `json:"path"`
Title string `json:"title"`
Code string `json:"code"`
}
func ReadScriptFile(path string) (script *Script, err error) {
filename := filepath.Base(path)
var body []byte
body, err = os.ReadFile(path)
if err != nil {
return
}
content := string(body)
var program *ast.Program
program, err = parser.ParseFile(nil, filename, content, parser.StoreComments)
if err != nil {
return
}
var title string
var titleIsSet bool
code := content
var idx0 file.Idx
for node, comments := range program.Comments {
if (titleIsSet && node.Idx0() > idx0) || len(comments) == 0 {
continue
}
idx0 = node.Idx0()
title = comments[0].Text
titleIsSet = true
}
script = &Script{
Path: path,
Title: title,
Code: code,
}
return
}

View File

@@ -113,11 +113,6 @@ func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.P
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if err := kubernetesProvider.RemoveConfigMap(ctx, selfResourcesNamespace, kubernetes.ConfigMapName); err != nil {
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)
}
if resources, err := kubernetesProvider.ListManagedServiceAccounts(ctx, selfResourcesNamespace); err != nil {
resourceDesc := fmt.Sprintf("ServiceAccounts in namespace %s", selfResourcesNamespace)
handleDeletionError(err, resourceDesc, &leftoverResources)

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/kubeshark/kubeshark/config"
"github.com/kubeshark/kubeshark/config/configStructs"
"github.com/kubeshark/kubeshark/docker"
"github.com/kubeshark/kubeshark/errormessage"
"github.com/kubeshark/kubeshark/kubernetes"
@@ -13,10 +14,10 @@ import (
core "k8s.io/api/core/v1"
)
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources kubernetes.Resources, imagePullPolicy core.PullPolicy, debug bool) (bool, error) {
func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, isNsRestrictedMode bool, selfNamespace string, hubResources configStructs.Resources, imagePullPolicy core.PullPolicy, imagePullSecrets []core.LocalObjectReference, debug bool) (bool, error) {
if !isNsRestrictedMode {
if err := createSelfNamespace(ctx, kubernetesProvider, selfNamespace); err != nil {
return false, err
log.Debug().Err(err).Send()
}
}
@@ -39,6 +40,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}
@@ -49,6 +51,7 @@ func CreateHubResources(ctx context.Context, kubernetesProvider *kubernetes.Prov
ServiceAccountName: serviceAccountName,
Resources: hubResources,
ImagePullPolicy: imagePullPolicy,
ImagePullSecrets: imagePullSecrets,
Debug: debug,
}

View File

@@ -5,8 +5,8 @@ const (
Red = "\033[1;31m%s\033[0m"
Green = "\033[1;32m%s\033[0m"
Yellow = "\033[1;33m%s\033[0m"
Purple = "\033[1;34m%s\033[0m"
Blue = "\033[1;34m%s\033[0m"
Magenta = "\033[1;35m%s\033[0m"
Teal = "\033[1;36m%s\033[0m"
Cyan = "\033[1;36m%s\033[0m"
White = "\033[1;37m%s\033[0m"
)