|
|
|
|
@@ -109,19 +109,17 @@ func RunMizuTap() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
|
|
|
|
|
|
|
|
|
defer finishMizuExecution(kubernetesProvider)
|
|
|
|
|
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
|
|
|
|
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel)
|
|
|
|
|
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel, nodeToTappedPodIPMap)
|
|
|
|
|
go goUtils.HandleExcWrapper(watchApiServerPod, ctx, kubernetesProvider, cancel, mizuApiFilteringOptions)
|
|
|
|
|
go goUtils.HandleExcWrapper(watchTapperPod, ctx, kubernetesProvider, cancel)
|
|
|
|
|
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel, mizuApiFilteringOptions)
|
|
|
|
|
|
|
|
|
|
//block until exit signal or error
|
|
|
|
|
// block until exit signal or error
|
|
|
|
|
waitForFinish(ctx, cancel)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -134,7 +132,7 @@ func readValidationRules(file string) (string, error) {
|
|
|
|
|
return string(newContent), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
|
|
|
|
|
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
|
|
|
|
|
if !config.Config.IsNsRestrictedMode() {
|
|
|
|
|
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
@@ -145,10 +143,6 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := createMizuConfigmap(ctx, kubernetesProvider, mizuValidationRules); err != nil {
|
|
|
|
|
logger.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Failed to create resources required for policy validation. Mizu will not validate policy rules. error: %v\n", errormessage.FormatError(err)))
|
|
|
|
|
}
|
|
|
|
|
@@ -228,7 +222,9 @@ func getMizuApiFilteringOptions() (*api.TrafficFilteringOptions, error) {
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
|
|
|
|
|
func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
|
|
|
|
|
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
|
|
|
|
|
|
|
|
|
if len(nodeToTappedPodIPMap) > 0 {
|
|
|
|
|
var serviceAccountName string
|
|
|
|
|
if state.mizuServiceAccountExists {
|
|
|
|
|
@@ -407,13 +403,8 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|
|
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error building node to ips map: %v", errormessage.FormatError(err)))
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions); err != nil {
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating daemonset: %v", errormessage.FormatError(err)))
|
|
|
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -531,7 +522,7 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
|
|
|
|
return missingPods
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
|
|
|
func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, mizuApiFilteringOptions *api.TrafficFilteringOptions) {
|
|
|
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
|
|
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
|
|
|
|
isPodReady := false
|
|
|
|
|
@@ -561,6 +552,23 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
|
|
|
|
|
|
|
|
|
if modifiedPod.Status.Phase == core.PodPending {
|
|
|
|
|
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
|
|
|
|
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
|
|
|
|
|
cancel()
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
|
|
|
|
|
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
|
|
|
|
|
cancel()
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
|
|
|
|
isPodReady = true
|
|
|
|
|
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
|
|
|
|
|
@@ -571,6 +579,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|
|
|
|
cancel()
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
if err := updateMizuTappers(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
|
|
|
|
|
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error updating tappers: %v", errormessage.FormatError(err)))
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Log.Infof("Mizu is available at %s\n", url)
|
|
|
|
|
openBrowser(url)
|
|
|
|
|
@@ -579,13 +591,13 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|
|
|
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case _, ok := <-errorChan:
|
|
|
|
|
case err, ok := <-errorChan:
|
|
|
|
|
if !ok {
|
|
|
|
|
errorChan = nil
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
|
|
|
|
logger.Log.Debugf("[ERROR] Agent creation, watching %v namespace, error: %v", config.Config.MizuResourcesNamespace, err)
|
|
|
|
|
cancel()
|
|
|
|
|
|
|
|
|
|
case <-timeAfter:
|
|
|
|
|
@@ -600,14 +612,10 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, nodeToTappedPodIPMap map[string][]string) {
|
|
|
|
|
func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
|
|
|
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s.*", mizu.TapperDaemonSetName))
|
|
|
|
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{config.Config.MizuResourcesNamespace}, podExactRegex)
|
|
|
|
|
var prevPodPhase core.PodPhase
|
|
|
|
|
var appendMetaname bool
|
|
|
|
|
if len(nodeToTappedPodIPMap) > 1 {
|
|
|
|
|
appendMetaname = true
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case addedPod, ok := <-added:
|
|
|
|
|
@@ -616,22 +624,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if appendMetaname {
|
|
|
|
|
logger.Log.Debugf("Tapper is created [%s]", addedPod.ObjectMeta.Name)
|
|
|
|
|
} else {
|
|
|
|
|
logger.Log.Debugf("Tapper is created")
|
|
|
|
|
}
|
|
|
|
|
logger.Log.Debugf("Tapper is created [%s]", addedPod.Name)
|
|
|
|
|
case removedPod, ok := <-removed:
|
|
|
|
|
if !ok {
|
|
|
|
|
removed = nil
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if appendMetaname {
|
|
|
|
|
logger.Log.Debugf("Tapper is removed [%s]", removedPod.ObjectMeta.Name)
|
|
|
|
|
} else {
|
|
|
|
|
logger.Log.Debugf("Tapper is removed")
|
|
|
|
|
}
|
|
|
|
|
logger.Log.Debugf("Tapper is removed [%s]", removedPod.Name)
|
|
|
|
|
case modifiedPod, ok := <-modified:
|
|
|
|
|
if !ok {
|
|
|
|
|
modified = nil
|
|
|
|
|
@@ -639,13 +639,14 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if modifiedPod.Status.Phase == core.PodPending && modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
|
|
|
|
|
logger.Log.Infof(uiUtils.Red, "Cannot deploy the tapper. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
|
|
|
|
|
logger.Log.Infof(uiUtils.Red, "Wasn't able to deploy the tapper %s. Reason: \"%s\"", modifiedPod.Name, modifiedPod.Status.Conditions[0].Message)
|
|
|
|
|
cancel()
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
podStatus := modifiedPod.Status
|
|
|
|
|
if podStatus.Phase == core.PodPending && prevPodPhase == podStatus.Phase {
|
|
|
|
|
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
prevPodPhase = podStatus.Phase
|
|
|
|
|
@@ -655,22 +656,19 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
|
|
|
|
|
if state.Terminated != nil {
|
|
|
|
|
switch state.Terminated.Reason {
|
|
|
|
|
case "OOMKilled":
|
|
|
|
|
logger.Log.Infof(uiUtils.Red, "Tapper is terminated! OOMKilled. Increase pod resources.")
|
|
|
|
|
logger.Log.Infof(uiUtils.Red, "Tapper %s was terminated (reason: OOMKilled). You should consider increasing machine resources.", modifiedPod.Name)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
logger.Log.Debugf("Tapper is %s", strings.ToLower(string(podStatus.Phase)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case _, ok := <-errorChan:
|
|
|
|
|
logger.Log.Debugf("Tapper %s is %s", modifiedPod.Name, strings.ToLower(string(podStatus.Phase)))
|
|
|
|
|
case err, ok := <-errorChan:
|
|
|
|
|
if !ok {
|
|
|
|
|
errorChan = nil
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
logger.Log.Errorf("[ERROR] Tapper creation, watching %v namespace", config.Config.MizuResourcesNamespace)
|
|
|
|
|
logger.Log.Debugf("[Error] Error in mizu tapper watch, err: %v", err)
|
|
|
|
|
cancel()
|
|
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|