mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-04-06 10:47:49 +00:00
prevPodPhase does not take into account the fact that there may be more than one tapper pod. Therefore it is not clear what its value represents. It is only used in a debug print. It is not worth the effort to fix for that one debug print. Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
851 lines
32 KiB
Go
851 lines
32 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/getkin/kin-openapi/openapi3"
|
|
"gopkg.in/yaml.v3"
|
|
core "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
"github.com/up9inc/mizu/cli/apiserver"
|
|
"github.com/up9inc/mizu/cli/cmd/goUtils"
|
|
"github.com/up9inc/mizu/cli/config"
|
|
"github.com/up9inc/mizu/cli/config/configStructs"
|
|
"github.com/up9inc/mizu/cli/errormessage"
|
|
"github.com/up9inc/mizu/cli/mizu"
|
|
"github.com/up9inc/mizu/cli/mizu/fsUtils"
|
|
"github.com/up9inc/mizu/cli/uiUtils"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/shared/kubernetes"
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
const cleanupTimeout = time.Minute
|
|
|
|
type tapState struct {
|
|
apiServerService *core.Service
|
|
tapperSyncer *kubernetes.MizuTapperSyncer
|
|
mizuServiceAccountExists bool
|
|
}
|
|
|
|
var state tapState
|
|
var apiProvider *apiserver.Provider
|
|
|
|
func RunMizuTap() {
|
|
startTime := time.Now()
|
|
|
|
mizuApiFilteringOptions, err := getMizuApiFilteringOptions()
|
|
apiProvider = apiserver.NewProvider(GetApiServerUrl(), apiserver.DefaultRetries, apiserver.DefaultTimeout)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error parsing regex-masking: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
var serializedValidationRules string
|
|
if config.Config.Tap.EnforcePolicyFile != "" {
|
|
serializedValidationRules, err = readValidationRules(config.Config.Tap.EnforcePolicyFile)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading policy file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
}
|
|
|
|
// Read and validate the OAS file
|
|
var serializedContract string
|
|
if config.Config.Tap.ContractFile != "" {
|
|
bytes, err := ioutil.ReadFile(config.Config.Tap.ContractFile)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error reading contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
serializedContract = string(bytes)
|
|
|
|
ctx := context.Background()
|
|
loader := &openapi3.Loader{Context: ctx}
|
|
doc, err := loader.LoadFromData(bytes)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error loading contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
err = doc.Validate(ctx)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error validating contract file: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
}
|
|
|
|
kubernetesProvider, err := getKubernetesProviderForCli()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel() // cancel will be called when this function exits
|
|
|
|
targetNamespaces := getNamespaces(kubernetesProvider)
|
|
|
|
serializedMizuConfig, err := config.GetSerializedMizuAgentConfig(targetNamespaces, mizuApiFilteringOptions)
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error composing mizu config: %v", errormessage.FormatError(err)))
|
|
return
|
|
}
|
|
|
|
if config.Config.IsNsRestrictedMode() {
|
|
if len(targetNamespaces) != 1 || !shared.Contains(targetNamespaces, config.Config.MizuResourcesNamespace) {
|
|
logger.Log.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
|
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, config.MizuResourcesNamespaceConfigName)
|
|
return
|
|
}
|
|
}
|
|
|
|
var namespacesStr string
|
|
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
|
|
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
|
|
} else {
|
|
namespacesStr = "all namespaces"
|
|
}
|
|
|
|
logger.Log.Infof("Tapping pods in %s", namespacesStr)
|
|
|
|
if config.Config.Tap.DryRun {
|
|
if err := printTappedPodsPreview(ctx, kubernetesProvider, targetNamespaces); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error listing pods: %v", errormessage.FormatError(err)))
|
|
}
|
|
return
|
|
}
|
|
|
|
if err := createMizuResources(ctx, cancel, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
|
|
|
var statusError *k8serrors.StatusError
|
|
if errors.As(err, &statusError) {
|
|
if statusError.ErrStatus.Reason == metav1.StatusReasonAlreadyExists {
|
|
logger.Log.Info("Mizu is already running in this namespace, change the `mizu-resources-namespace` configuration or run `mizu clean` to remove the currently running Mizu instance")
|
|
}
|
|
}
|
|
return
|
|
}
|
|
if config.Config.Tap.DaemonMode {
|
|
if err := handleDaemonModePostCreation(ctx, cancel, kubernetesProvider, targetNamespaces); err != nil {
|
|
defer finishMizuExecution(kubernetesProvider, apiProvider)
|
|
cancel()
|
|
} else {
|
|
logger.Log.Infof(uiUtils.Magenta, "Mizu is now running in daemon mode, run `mizu view` to connect to the mizu daemon instance")
|
|
}
|
|
} else {
|
|
defer finishMizuExecution(kubernetesProvider, apiProvider)
|
|
|
|
if err = startTapperSyncer(ctx, cancel, kubernetesProvider, targetNamespaces, *mizuApiFilteringOptions); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error starting mizu tapper syncer: %v", err))
|
|
cancel()
|
|
}
|
|
|
|
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
|
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
|
|
go goUtils.HandleExcWrapper(watchMizuEvents, ctx, kubernetesProvider, cancel, startTime)
|
|
|
|
// block until exit signal or error
|
|
waitForFinish(ctx, cancel)
|
|
}
|
|
}
|
|
|
|
func handleDaemonModePostCreation(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
|
if err := printTappedPodsPreview(ctx, kubernetesProvider, namespaces); err != nil {
|
|
return err
|
|
}
|
|
|
|
apiProvider := apiserver.NewProvider(GetApiServerUrl(), 90, 1*time.Second)
|
|
|
|
if err := waitForDaemonModeToBeReady(cancel, kubernetesProvider, apiProvider); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
this function is a bit problematic as it might be detached from the actual pods the mizu api server will tap.
|
|
The alternative would be to wait for api server to be ready and then query it for the pods it listens to, this has
|
|
the arguably worse drawback of taking a relatively very long time before the user sees which pods are targeted, if any.
|
|
*/
|
|
func printTappedPodsPreview(ctx context.Context, kubernetesProvider *kubernetes.Provider, namespaces []string) error {
|
|
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, config.Config.Tap.PodRegex(), namespaces); err != nil {
|
|
return err
|
|
} else {
|
|
if len(matchingPods) == 0 {
|
|
printNoPodsFoundSuggestion(namespaces)
|
|
}
|
|
logger.Log.Info("Pods that match the provided criteria at this instant:")
|
|
for _, tappedPod := range matchingPods {
|
|
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func waitForDaemonModeToBeReady(cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, apiProvider *apiserver.Provider) error {
|
|
logger.Log.Info("Waiting for mizu to be ready... (may take a few minutes)")
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
// TODO: TRA-3903 add a smarter test to see that tapping/pod watching is functioning properly
|
|
if err := apiProvider.TestConnection(); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Mizu was not ready in time, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func startTapperSyncer(ctx context.Context, cancel context.CancelFunc, provider *kubernetes.Provider, targetNamespaces []string, mizuApiFilteringOptions api.TrafficFilteringOptions) error {
|
|
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
|
TargetNamespaces: targetNamespaces,
|
|
PodFilterRegex: *config.Config.Tap.PodRegex(),
|
|
MizuResourcesNamespace: config.Config.MizuResourcesNamespace,
|
|
AgentImage: config.Config.AgentImage,
|
|
TapperResources: config.Config.Tap.TapperResources,
|
|
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
|
LogLevel: config.Config.LogLevel(),
|
|
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
|
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
|
MizuServiceAccountExists: state.mizuServiceAccountExists,
|
|
Istio: config.Config.Tap.Istio,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, tappedPod := range tapperSyncer.CurrentlyTappedPods {
|
|
logger.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", tappedPod.Name))
|
|
}
|
|
|
|
if len(tapperSyncer.CurrentlyTappedPods) == 0 {
|
|
printNoPodsFoundSuggestion(targetNamespaces)
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case syncerErr, ok := <-tapperSyncer.ErrorOut:
|
|
if !ok {
|
|
logger.Log.Debug("mizuTapperSyncer err channel closed, ending listener loop")
|
|
return
|
|
}
|
|
logger.Log.Errorf(uiUtils.Error, getErrorDisplayTextForK8sTapManagerError(syncerErr))
|
|
cancel()
|
|
case _, ok := <-tapperSyncer.TapPodChangesOut:
|
|
if !ok {
|
|
logger.Log.Debug("mizuTapperSyncer pod changes channel closed, ending listener loop")
|
|
return
|
|
}
|
|
if err := apiProvider.ReportTappedPods(tapperSyncer.CurrentlyTappedPods); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
}
|
|
case <-ctx.Done():
|
|
logger.Log.Debug("mizuTapperSyncer event listener loop exiting due to context done")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
state.tapperSyncer = tapperSyncer
|
|
|
|
return nil
|
|
}
|
|
|
|
func printNoPodsFoundSuggestion(targetNamespaces []string) {
|
|
var suggestionStr string
|
|
if !shared.Contains(targetNamespaces, kubernetes.K8sAllNamespaces) {
|
|
suggestionStr = ". You can also try selecting a different namespace with -n or tap all namespaces with -A"
|
|
}
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any currently running pods that match the regex argument, mizu will automatically tap matching pods if any are created later%s", suggestionStr))
|
|
}
|
|
|
|
func getErrorDisplayTextForK8sTapManagerError(err kubernetes.K8sTapManagerError) string {
|
|
switch err.TapManagerReason {
|
|
case kubernetes.TapManagerPodListError:
|
|
return fmt.Sprintf("Failed to update currently tapped pods: %v", err.OriginalError)
|
|
case kubernetes.TapManagerPodWatchError:
|
|
return fmt.Sprintf("Error occured in k8s pod watch: %v", err.OriginalError)
|
|
case kubernetes.TapManagerTapperUpdateError:
|
|
return fmt.Sprintf("Error updating tappers: %v", err.OriginalError)
|
|
default:
|
|
return fmt.Sprintf("Unknown error occured in k8s tap manager: %v", err.OriginalError)
|
|
}
|
|
}
|
|
|
|
func readValidationRules(file string) (string, error) {
|
|
rules, err := shared.DecodeEnforcePolicy(file)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
newContent, _ := yaml.Marshal(&rules)
|
|
return string(newContent), nil
|
|
}
|
|
|
|
func createMizuResources(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := createMizuConfigmap(ctx, kubernetesProvider, serializedValidationRules, serializedContract, serializedMizuConfig); err != nil {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
|
|
var err error
|
|
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
|
|
if err != nil {
|
|
if !config.Config.Tap.DaemonMode {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to ensure the resources required for IP resolving. Mizu will not resolve target IPs to names. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
|
|
var serviceAccountName string
|
|
if state.mizuServiceAccountExists {
|
|
serviceAccountName = kubernetes.ServiceAccountName
|
|
} else {
|
|
serviceAccountName = ""
|
|
}
|
|
|
|
opts := &kubernetes.ApiServerOptions{
|
|
Namespace: config.Config.MizuResourcesNamespace,
|
|
PodName: kubernetes.ApiServerPodName,
|
|
PodImage: config.Config.AgentImage,
|
|
ServiceAccountName: serviceAccountName,
|
|
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
|
|
SyncEntriesConfig: getSyncEntriesConfig(),
|
|
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
|
|
Resources: config.Config.Tap.ApiServerResources,
|
|
ImagePullPolicy: config.Config.ImagePullPolicy(),
|
|
LogLevel: config.Config.LogLevel(),
|
|
}
|
|
|
|
if config.Config.Tap.DaemonMode {
|
|
if !state.mizuServiceAccountExists {
|
|
defer cleanUpMizuResources(ctx, cancel, kubernetesProvider)
|
|
logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("Failed to ensure the resources required for mizu to run in daemon mode. cannot proceed. error: %v", errormessage.FormatError(err)))
|
|
}
|
|
if err := createMizuApiServerDeployment(ctx, kubernetesProvider, opts); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := createMizuApiServerPod(ctx, kubernetesProvider, opts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
state.apiServerService, err = kubernetesProvider.CreateService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName, kubernetes.ApiServerPodName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created service: %s", kubernetes.ApiServerPodName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, serializedValidationRules string, serializedContract string, serializedMizuConfig string) error {
|
|
err := kubernetesProvider.CreateConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName, serializedValidationRules, serializedContract, serializedMizuConfig)
|
|
return err
|
|
}
|
|
|
|
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
|
|
_, err := kubernetesProvider.CreateNamespace(ctx, config.Config.MizuResourcesNamespace)
|
|
return err
|
|
}
|
|
|
|
func createMizuApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
|
|
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, false, "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err = kubernetesProvider.CreatePod(ctx, config.Config.MizuResourcesNamespace, pod); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created API server pod: %s", kubernetes.ApiServerPodName)
|
|
return nil
|
|
}
|
|
|
|
func createMizuApiServerDeployment(ctx context.Context, kubernetesProvider *kubernetes.Provider, opts *kubernetes.ApiServerOptions) error {
|
|
volumeClaimCreated := false
|
|
if !config.Config.Tap.NoPersistentVolumeClaim {
|
|
volumeClaimCreated = TryToCreatePersistentVolumeClaim(ctx, kubernetesProvider)
|
|
}
|
|
|
|
pod, err := kubernetesProvider.GetMizuApiServerPodObject(opts, volumeClaimCreated, kubernetes.PersistentVolumeClaimName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err = kubernetesProvider.CreateDeployment(ctx, config.Config.MizuResourcesNamespace, opts.PodName, pod); err != nil {
|
|
return err
|
|
}
|
|
logger.Log.Debugf("Successfully created API server deployment: %s", kubernetes.ApiServerPodName)
|
|
return nil
|
|
}
|
|
|
|
func TryToCreatePersistentVolumeClaim(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
|
|
isDefaultStorageClassAvailable, err := kubernetesProvider.IsDefaultStorageProviderAvailable(ctx)
|
|
if err != nil {
|
|
logger.Log.Warningf(uiUtils.Yellow, "An error occured when checking if a default storage provider exists in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
|
|
logger.Log.Debugf("error checking if default storage class exists: %v", err)
|
|
return false
|
|
} else if !isDefaultStorageClassAvailable {
|
|
logger.Log.Warningf(uiUtils.Yellow, "Could not find default storage provider in this cluster, this means mizu data will be lost on mizu-api-server pod restart")
|
|
return false
|
|
}
|
|
|
|
if _, err = kubernetesProvider.CreatePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName, config.Config.Tap.MaxEntriesDBSizeBytes()+mizu.DaemonModePersistentVolumeSizeBufferBytes); err != nil {
|
|
logger.Log.Warningf(uiUtils.Yellow, "An error has occured while creating a persistent volume claim for mizu, this means mizu data will be lost on mizu-api-server pod restart")
|
|
logger.Log.Debugf("error creating persistent volume claim: %v", err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
|
|
var compiledRegexSlice []*api.SerializableRegexp
|
|
|
|
if config.Config.Tap.PlainTextFilterRegexes != nil && len(config.Config.Tap.PlainTextFilterRegexes) > 0 {
|
|
compiledRegexSlice = make([]*api.SerializableRegexp, 0)
|
|
for _, regexStr := range config.Config.Tap.PlainTextFilterRegexes {
|
|
compiledRegex, err := api.CompileRegexToSerializableRegexp(regexStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
|
|
}
|
|
}
|
|
|
|
return &api.TrafficFilteringOptions{
|
|
PlainTextMaskingRegexes: compiledRegexSlice,
|
|
IgnoredUserAgents: config.Config.Tap.IgnoredUserAgents,
|
|
DisableRedaction: config.Config.Tap.DisableRedaction,
|
|
}, nil
|
|
}
|
|
|
|
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
|
|
if !config.Config.Tap.Analysis && config.Config.Tap.Workspace == "" {
|
|
return nil
|
|
}
|
|
|
|
return &shared.SyncEntriesConfig{
|
|
Token: config.Config.Auth.Token,
|
|
Env: config.Config.Auth.EnvName,
|
|
Workspace: config.Config.Tap.Workspace,
|
|
UploadIntervalSec: config.Config.Tap.UploadIntervalSec,
|
|
}
|
|
}
|
|
|
|
func cleanUpRestrictedMode(ctx context.Context, kubernetesProvider *kubernetes.Provider) []string {
|
|
leftoverResources := make([]string, 0)
|
|
|
|
if err := kubernetesProvider.RemoveService(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Service %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveDaemonSet(ctx, config.Config.MizuResourcesNamespace, kubernetes.TapperDaemonSetName); err != nil {
|
|
resourceDesc := fmt.Sprintf("DaemonSet %s in namespace %s", kubernetes.TapperDaemonSetName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveConfigMap(ctx, config.Config.MizuResourcesNamespace, kubernetes.ConfigMapName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ConfigMap %s in namespace %s", kubernetes.ConfigMapName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveServicAccount(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Service Account %s in namespace %s", kubernetes.ServiceAccountName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.RoleName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemovePod(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Pod %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
//daemon mode resources
|
|
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.RoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.RoleBindingName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveDeployment(ctx, config.Config.MizuResourcesNamespace, kubernetes.ApiServerPodName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Deployment %s in namespace %s", kubernetes.ApiServerPodName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemovePersistentVolumeClaim(ctx, config.Config.MizuResourcesNamespace, kubernetes.PersistentVolumeClaimName); err != nil {
|
|
resourceDesc := fmt.Sprintf("PersistentVolumeClaim %s in namespace %s", kubernetes.PersistentVolumeClaimName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRole(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("Role %s in namespace %s", kubernetes.DaemonRoleName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveRoleBinding(ctx, config.Config.MizuResourcesNamespace, kubernetes.DaemonRoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("RoleBinding %s in namespace %s", kubernetes.DaemonRoleBindingName, config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
return leftoverResources
|
|
}
|
|
|
|
func cleanUpNonRestrictedMode(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) []string {
|
|
leftoverResources := make([]string, 0)
|
|
|
|
if err := kubernetesProvider.RemoveNamespace(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
|
resourceDesc := fmt.Sprintf("Namespace %s", config.Config.MizuResourcesNamespace)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
} else {
|
|
defer waitUntilNamespaceDeleted(ctx, cancel, kubernetesProvider)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveClusterRole(ctx, kubernetes.ClusterRoleName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ClusterRole %s", kubernetes.ClusterRoleName)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
if err := kubernetesProvider.RemoveClusterRoleBinding(ctx, kubernetes.ClusterRoleBindingName); err != nil {
|
|
resourceDesc := fmt.Sprintf("ClusterRoleBinding %s", kubernetes.ClusterRoleBindingName)
|
|
handleDeletionError(err, resourceDesc, &leftoverResources)
|
|
}
|
|
|
|
return leftoverResources
|
|
}
|
|
|
|
func handleDeletionError(err error, resourceDesc string, leftoverResources *[]string) {
|
|
logger.Log.Debugf("Error removing %s: %v", resourceDesc, errormessage.FormatError(err))
|
|
*leftoverResources = append(*leftoverResources, resourceDesc)
|
|
}
|
|
|
|
func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, kubernetesProvider *kubernetes.Provider) {
|
|
// Call cancel if a terminating signal was received. Allows user to skip the wait.
|
|
go func() {
|
|
waitForFinish(ctx, cancel)
|
|
}()
|
|
|
|
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, config.Config.MizuResourcesNamespace); err != nil {
|
|
switch {
|
|
case ctx.Err() == context.Canceled:
|
|
logger.Log.Debugf("Do nothing. User interrupted the wait")
|
|
case err == wait.ErrWaitTimeout:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", config.Config.MizuResourcesNamespace))
|
|
default:
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", config.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", kubernetes.ApiServerPodName))
|
|
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
|
isPodReady := false
|
|
timeAfter := time.After(25 * time.Second)
|
|
for {
|
|
select {
|
|
case _, ok := <-added:
|
|
if !ok {
|
|
added = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Watching API Server pod loop, added")
|
|
case _, ok := <-removed:
|
|
if !ok {
|
|
removed = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Infof("%s removed", kubernetes.ApiServerPodName)
|
|
cancel()
|
|
return
|
|
case wEvent, ok := <-modified:
|
|
if !ok {
|
|
modified = nil
|
|
continue
|
|
}
|
|
|
|
modifiedPod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, err)
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
|
|
|
if modifiedPod.Status.Phase == core.PodPending {
|
|
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
|
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
|
cancel()
|
|
break
|
|
}
|
|
|
|
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
|
|
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
|
|
cancel()
|
|
break
|
|
}
|
|
}
|
|
|
|
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
|
isPodReady = true
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
url := GetApiServerUrl()
|
|
if err := apiProvider.TestConnection(); err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
|
|
cancel()
|
|
break
|
|
}
|
|
|
|
logger.Log.Infof("Mizu is available at %s", url)
|
|
if !config.Config.HeadlessMode {
|
|
uiUtils.OpenBrowser(url)
|
|
}
|
|
if err := apiProvider.ReportTappedPods(state.tapperSyncer.CurrentlyTappedPods); err != nil {
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
}
|
|
}
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Errorf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
|
|
cancel()
|
|
|
|
case <-timeAfter:
|
|
if !isPodReady {
|
|
logger.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
|
cancel()
|
|
}
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching API Server pod loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.TapperDaemonSetName))
|
|
podWatchHelper := kubernetes.NewPodWatchHelper(kubernetesProvider, podExactRegex)
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, podWatchHelper, []string{config.Config.MizuResourcesNamespace}, podWatchHelper)
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-added:
|
|
if !ok {
|
|
added = nil
|
|
continue
|
|
}
|
|
|
|
addedPod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, err)
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
|
|
case wEvent, ok := <-removed:
|
|
if !ok {
|
|
removed = nil
|
|
continue
|
|
}
|
|
|
|
removedPod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, err)
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
|
|
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
|
|
case wEvent, ok := <-modified:
|
|
if !ok {
|
|
modified = nil
|
|
continue
|
|
}
|
|
|
|
modifiedPod, err := wEvent.ToPod()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, err)
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
|
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message))
|
|
cancel()
|
|
continue
|
|
}
|
|
|
|
podStatus := modifiedPod.Status
|
|
|
|
if podStatus.Phase == core.PodRunning {
|
|
state := podStatus.ContainerStatuses[0].State
|
|
if state.Terminated != nil {
|
|
switch state.Terminated.Reason {
|
|
case "OOMKilled":
|
|
logger.Log.Infof(uiUtils.Red, fmt.Sprintf("Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name))
|
|
}
|
|
}
|
|
}
|
|
|
|
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Errorf("[Error] Error in mizu tapper pod watch, err: %v", err)
|
|
cancel()
|
|
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("Watching tapper pod loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func watchMizuEvents(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, startTime time.Time) {
|
|
// Round down because k8s CreationTimestamp is given in 1 sec resolution.
|
|
startTime = startTime.Truncate(time.Second)
|
|
|
|
mizuResourceRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", kubernetes.MizuResourcesPrefix))
|
|
eventWatchHelper := kubernetes.NewEventWatchHelper(kubernetesProvider, mizuResourceRegex)
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, eventWatchHelper, []string{config.Config.MizuResourcesNamespace}, eventWatchHelper)
|
|
|
|
for {
|
|
select {
|
|
case wEvent, ok := <-added:
|
|
if !ok {
|
|
added = nil
|
|
continue
|
|
}
|
|
|
|
event, err := wEvent.ToEvent()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource added event: %+v", err))
|
|
cancel()
|
|
}
|
|
|
|
if startTime.After(event.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
if event.Type == core.EventTypeWarning {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
|
|
}
|
|
case wEvent, ok := <-removed:
|
|
if !ok {
|
|
removed = nil
|
|
continue
|
|
}
|
|
|
|
event, err := wEvent.ToEvent()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource removed event: %+v", err))
|
|
cancel()
|
|
}
|
|
|
|
if startTime.After(event.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
if event.Type == core.EventTypeWarning {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
|
|
}
|
|
case wEvent, ok := <-modified:
|
|
if !ok {
|
|
modified = nil
|
|
continue
|
|
}
|
|
|
|
event, err := wEvent.ToEvent()
|
|
if err != nil {
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("error parsing Mizu resource modified event: %+v", err))
|
|
cancel()
|
|
}
|
|
|
|
if startTime.After(event.CreationTimestamp.Time) {
|
|
continue
|
|
}
|
|
|
|
if event.Type == core.EventTypeWarning {
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Resource %s in state %s - %s", event.Regarding.Name, event.Reason, event.Note))
|
|
}
|
|
case err, ok := <-errorChan:
|
|
if !ok {
|
|
errorChan = nil
|
|
continue
|
|
}
|
|
|
|
logger.Log.Errorf("error in watch mizu resource events loop: %+v", err)
|
|
cancel()
|
|
|
|
case <-ctx.Done():
|
|
logger.Log.Debugf("watching Mizu resource events loop, ctx done")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
|
if config.Config.Tap.AllNamespaces {
|
|
return []string{kubernetes.K8sAllNamespaces}
|
|
} else if len(config.Config.Tap.Namespaces) > 0 {
|
|
return shared.Unique(config.Config.Tap.Namespaces)
|
|
} else {
|
|
currentNamespace, err := kubernetesProvider.CurrentNamespace()
|
|
if err != nil {
|
|
logger.Log.Fatalf(uiUtils.Red, fmt.Sprintf("error getting current namespace: %+v", err))
|
|
}
|
|
return []string{currentNamespace}
|
|
}
|
|
}
|
|
|
|
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) {
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
if err := kubernetesProvider.CreateMizuRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.ClusterRoleName, kubernetes.ClusterRoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
} else {
|
|
if err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.RoleName, kubernetes.RoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
if config.Config.Tap.DaemonMode {
|
|
if err := kubernetesProvider.CreateDaemonsetRBAC(ctx, config.Config.MizuResourcesNamespace, kubernetes.ServiceAccountName, kubernetes.DaemonRoleName, kubernetes.DaemonRoleBindingName, mizu.RBACVersion); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|