mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-21 05:20:25 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ebbe6458a8 | ||
|
|
7f2021c312 | ||
|
|
824945141a | ||
|
|
0244f12167 | ||
|
|
60533a9591 | ||
|
|
90f0f603c7 | ||
|
|
683d199774 | ||
|
|
fa632b49a7 |
@@ -13,6 +13,7 @@ require (
|
||||
github.com/go-playground/validator/v10 v10.5.0
|
||||
github.com/google/martian v2.1.0+incompatible
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
|
||||
@@ -28,13 +28,13 @@ var k8sResolver *resolver.Resolver
|
||||
|
||||
func StartResolving(namespace string) {
|
||||
errOut := make(chan error, 100)
|
||||
res, err := resolver.NewFromInCluster(errOut)
|
||||
res, err := resolver.NewFromInCluster(errOut, namespace)
|
||||
if err != nil {
|
||||
rlog.Infof("error creating k8s resolver %s", err)
|
||||
return
|
||||
}
|
||||
ctx := context.Background()
|
||||
res.Start(ctx, namespace)
|
||||
res.Start(ctx)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
||||
@@ -2,9 +2,9 @@ package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"net/http"
|
||||
"sync"
|
||||
@@ -50,7 +50,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
|
||||
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
|
||||
conn, err := websocketUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
fmt.Println("Failed to set websocket upgrade: %+v", err)
|
||||
rlog.Errorf("Failed to set websocket upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
for {
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
fmt.Printf("Conn err: %v\n", err)
|
||||
rlog.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||
break
|
||||
}
|
||||
eventHandlers.WebSocketMessage(socketId, msg)
|
||||
@@ -81,7 +81,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
err := socketConnection.connection.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("Error closing socket connection for socket id %d: %v\n", socketId, err)
|
||||
rlog.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
|
||||
}
|
||||
|
||||
websocketIdsLock.Lock()
|
||||
@@ -92,7 +92,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
}
|
||||
|
||||
var db = debounce.NewDebouncer(time.Second*5, func() {
|
||||
fmt.Println("Successfully sent to socket")
|
||||
rlog.Error("Successfully sent to socket")
|
||||
})
|
||||
|
||||
func SendToSocket(socketId int, message []byte) error {
|
||||
@@ -104,7 +104,7 @@ func SendToSocket(socketId int, message []byte) error {
|
||||
var sent = false
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
if !sent {
|
||||
fmt.Println("Socket timed out")
|
||||
rlog.Error("Socket timed out")
|
||||
socketCleanup(socketId, socketObj)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -52,7 +52,7 @@ func BroadcastToBrowserClients(message []byte) {
|
||||
go func(socketId int) {
|
||||
err := SendToSocket(socketId, message)
|
||||
if err != nil {
|
||||
fmt.Printf("error sending message to socket ID %d: %v", socketId, err)
|
||||
rlog.Errorf("error sending message to socket ID %d: %v", socketId, err)
|
||||
}
|
||||
}(socketId)
|
||||
}
|
||||
@@ -114,7 +114,7 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
|
||||
if err != nil {
|
||||
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
|
||||
} else {
|
||||
fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage))
|
||||
rlog.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
|
||||
BroadcastToBrowserClients(marshaledMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/shared/units"
|
||||
@@ -47,7 +47,7 @@ func StartEnforcingDatabaseSize() {
|
||||
if !ok {
|
||||
return // closed channel
|
||||
}
|
||||
fmt.Printf("filesystem watcher encountered error:%v\n", err)
|
||||
rlog.Errorf("filesystem watcher encountered error:%v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -72,7 +72,7 @@ func getMaxEntriesDBByteSize() (int64, error) {
|
||||
func checkFileSize(maxSizeBytes int64) {
|
||||
fileStat, err := os.Stat(DBPath)
|
||||
if err != nil {
|
||||
fmt.Printf("Error checking %s file size: %v\n", DBPath, err)
|
||||
rlog.Errorf("Error checking %s file size: %v", DBPath, err)
|
||||
} else {
|
||||
if fileStat.Size() > maxSizeBytes {
|
||||
pruneOldEntries(fileStat.Size())
|
||||
@@ -83,13 +83,13 @@ func checkFileSize(maxSizeBytes int64) {
|
||||
func pruneOldEntries(currentFileSize int64) {
|
||||
// sqlite locks the database while delete or VACUUM are running and sqlite is terrible at handling its own db lock while a lot of inserts are attempted, we prevent a significant bottleneck by handling the db lock ourselves here
|
||||
IsDBLocked = true
|
||||
defer func() {IsDBLocked = false}()
|
||||
defer func() { IsDBLocked = false }()
|
||||
|
||||
amountOfBytesToTrim := currentFileSize / (100 / percentageOfMaxSizeBytesToPrune)
|
||||
|
||||
rows, err := GetEntriesTable().Limit(10000).Order("id").Rows()
|
||||
if err != nil {
|
||||
fmt.Printf("Error getting 10000 first db rows: %v\n", err)
|
||||
rlog.Errorf("Error getting 10000 first db rows: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -102,7 +102,7 @@ func pruneOldEntries(currentFileSize int64) {
|
||||
var entry models.MizuEntry
|
||||
err = DB.ScanRows(rows, &entry)
|
||||
if err != nil {
|
||||
fmt.Printf("Error scanning db row: %v\n", err)
|
||||
rlog.Errorf("Error scanning db row: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -114,8 +114,8 @@ func pruneOldEntries(currentFileSize int64) {
|
||||
GetEntriesTable().Where(entryIdsToRemove).Delete(models.MizuEntry{})
|
||||
// VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this
|
||||
DB.Exec("VACUUM")
|
||||
fmt.Printf("Removed %d rows and cleared %s\n", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
|
||||
rlog.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
|
||||
} else {
|
||||
fmt.Println("Found no rows to remove when pruning")
|
||||
rlog.Error("Found no rows to remove when pruning")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ Now you will be able to import `github.com/up9inc/mizu/resolver` in any `.go` fi
|
||||
errOut := make(chan error, 100)
|
||||
k8sResolver, err := resolver.NewFromOutOfCluster("", errOut)
|
||||
if err != nil {
|
||||
fmt.Printf("error creating k8s resolver %s", err)
|
||||
rlog.Errorf("error creating k8s resolver %s", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -40,15 +40,15 @@ k8sResolver.Start(ctx)
|
||||
|
||||
resolvedName := k8sResolver.Resolve("10.107.251.91") // will always return `nil` in real scenarios as the internal map takes a moment to populate after `Start` is called
|
||||
if resolvedName != nil {
|
||||
fmt.Printf("resolved 10.107.251.91=%s", *resolvedName)
|
||||
rlog.Errorf("resolved 10.107.251.91=%s", *resolvedName)
|
||||
} else {
|
||||
fmt.Printf("Could not find a resolved name for 10.107.251.91")
|
||||
rlog.Error("Could not find a resolved name for 10.107.251.91")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <- errOut:
|
||||
fmt.Printf("name resolving error %s", err)
|
||||
rlog.Errorf("name resolving error %s", err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
@@ -9,7 +10,7 @@ import (
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func NewFromInCluster(errOut chan error) (*Resolver, error) {
|
||||
func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
|
||||
config, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -18,5 +19,5 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namesapce}, nil
|
||||
}
|
||||
|
||||
@@ -7,12 +7,14 @@ import (
|
||||
"github.com/romana/rlog"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
"github.com/orcaman/concurrent-map"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
const (
|
||||
kubClientNullString = "None"
|
||||
)
|
||||
@@ -20,17 +22,16 @@ const (
|
||||
type Resolver struct {
|
||||
clientConfig *restclient.Config
|
||||
clientSet *kubernetes.Clientset
|
||||
nameMap map[string]string
|
||||
serviceMap map[string]string
|
||||
nameMap cmap.ConcurrentMap
|
||||
serviceMap cmap.ConcurrentMap
|
||||
isStarted bool
|
||||
errOut chan error
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Start(ctx context.Context, namespace string) {
|
||||
func (resolver *Resolver) Start(ctx context.Context) {
|
||||
if !resolver.isStarted {
|
||||
resolver.isStarted = true
|
||||
resolver.namespace = namespace
|
||||
|
||||
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchServices)
|
||||
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchEndpoints)
|
||||
@@ -39,19 +40,19 @@ func (resolver *Resolver) Start(ctx context.Context, namespace string) {
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Resolve(name string) string {
|
||||
resolvedName, isFound := resolver.nameMap[name]
|
||||
resolvedName, isFound := resolver.nameMap.Get(name)
|
||||
if !isFound {
|
||||
return ""
|
||||
}
|
||||
return resolvedName
|
||||
return resolvedName.(string)
|
||||
}
|
||||
|
||||
func (resolver *Resolver) GetMap() map[string]string {
|
||||
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
|
||||
return resolver.nameMap
|
||||
}
|
||||
|
||||
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
|
||||
_, isFound := resolver.serviceMap[address]
|
||||
_, isFound := resolver.serviceMap.Get(address)
|
||||
return isFound
|
||||
}
|
||||
|
||||
@@ -63,17 +64,17 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl pod watch")
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
pod := event.Object.(*corev1.Pod)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
|
||||
}
|
||||
case <- ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl pod watch")
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
pod := event.Object.(*corev1.Pod)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -86,37 +87,37 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl endpoint watch")
|
||||
}
|
||||
endpoint := event.Object.(*corev1.Endpoints)
|
||||
serviceHostname := fmt.Sprintf("%s.%s", endpoint.Name, endpoint.Namespace)
|
||||
if endpoint.Subsets != nil {
|
||||
for _, subset := range endpoint.Subsets {
|
||||
var ports []int32
|
||||
if subset.Ports != nil {
|
||||
for _, portMapping := range subset.Ports {
|
||||
if portMapping.Port > 0 {
|
||||
ports = append(ports, portMapping.Port)
|
||||
}
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl endpoint watch")
|
||||
}
|
||||
endpoint := event.Object.(*corev1.Endpoints)
|
||||
serviceHostname := fmt.Sprintf("%s.%s", endpoint.Name, endpoint.Namespace)
|
||||
if endpoint.Subsets != nil {
|
||||
for _, subset := range endpoint.Subsets {
|
||||
var ports []int32
|
||||
if subset.Ports != nil {
|
||||
for _, portMapping := range subset.Ports {
|
||||
if portMapping.Port > 0 {
|
||||
ports = append(ports, portMapping.Port)
|
||||
}
|
||||
}
|
||||
if subset.Addresses != nil {
|
||||
for _, address := range subset.Addresses {
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
|
||||
for _, port := range ports {
|
||||
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if subset.Addresses != nil {
|
||||
for _, address := range subset.Addresses {
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
|
||||
for _, port := range ports {
|
||||
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
case <- ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -129,7 +130,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl service watch")
|
||||
}
|
||||
@@ -145,7 +146,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
@@ -154,19 +155,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
delete(resolver.nameMap, key)
|
||||
resolver.nameMap.Remove(key)
|
||||
rlog.Infof("setting %s=nil\n", key)
|
||||
} else {
|
||||
resolver.nameMap[key] = resolved
|
||||
resolver.nameMap.Set(key, resolved)
|
||||
rlog.Infof("setting %s=%s\n", key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
delete(resolver.serviceMap, key)
|
||||
resolver.serviceMap.Remove(key)
|
||||
} else {
|
||||
resolver.serviceMap[key] = resolved
|
||||
resolver.serviceMap.Set(key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,4 +190,3 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/romana/rlog"
|
||||
"gorm.io/gorm/logger"
|
||||
"gorm.io/gorm/utils"
|
||||
"time"
|
||||
@@ -10,7 +11,7 @@ import (
|
||||
|
||||
// TruncatingLogger implements the gorm logger.Interface interface. Its purpose is to act as gorm's logger while truncating logs to a max of 50 characters to minimise the performance impact
|
||||
type TruncatingLogger struct {
|
||||
LogLevel logger.LogLevel
|
||||
LogLevel logger.LogLevel
|
||||
SlowThreshold time.Duration
|
||||
}
|
||||
|
||||
@@ -23,21 +24,21 @@ func (truncatingLogger *TruncatingLogger) Info(_ context.Context, message string
|
||||
if truncatingLogger.LogLevel < logger.Info {
|
||||
return
|
||||
}
|
||||
fmt.Printf("gorm info: %.150s\n", message)
|
||||
rlog.Errorf("gorm info: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Warn(_ context.Context, message string, __ ...interface{}) {
|
||||
if truncatingLogger.LogLevel < logger.Warn {
|
||||
return
|
||||
}
|
||||
fmt.Printf("gorm warning: %.150s\n", message)
|
||||
rlog.Errorf("gorm warning: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Error(_ context.Context, message string, __ ...interface{}) {
|
||||
if truncatingLogger.LogLevel < logger.Error {
|
||||
return
|
||||
}
|
||||
fmt.Printf("gorm error: %.150s\n", message)
|
||||
rlog.Errorf("gorm error: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) {
|
||||
|
||||
46
cli/cmd/logs.go
Normal file
46
cli/cmd/logs.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
var filePath string
|
||||
|
||||
var logsCmd = &cobra.Command{
|
||||
Use: "logs",
|
||||
Short: "Create a zip file with logs for Github issue or troubleshoot",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.View.KubeConfigPath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
|
||||
if filePath == "" {
|
||||
pwd, err := os.Getwd()
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Failed to get PWD, %v (try using `mizu logs -f <full path dest zip file>)`", err)
|
||||
return nil
|
||||
}
|
||||
filePath = path.Join(pwd, "mizu_logs.zip")
|
||||
}
|
||||
mizu.Log.Debugf("Using file path %s", filePath)
|
||||
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, ctx, filePath); err != nil {
|
||||
mizu.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(logsCmd)
|
||||
logsCmd.Flags().StringVarP(&filePath, "file", "f", "", "Path for zip file (default current <pwd>\\mizu_logs.zip)")
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
)
|
||||
|
||||
@@ -13,9 +13,12 @@ var rootCmd = &cobra.Command{
|
||||
Long: `A web traffic viewer for kubernetes
|
||||
Further info is available at https://github.com/up9inc/mizu`,
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
if err := fsUtils.EnsureDir(mizu.GetMizuFolderPath()); err != nil {
|
||||
mizu.Log.Errorf("Failed to use mizu folder, %v", err)
|
||||
}
|
||||
mizu.InitLogger()
|
||||
if err := mizu.InitConfig(cmd); err != nil {
|
||||
mizu.Log.Errorf("Invalid config, Exit %s", err)
|
||||
return errors.New(fmt.Sprintf("%v", err))
|
||||
mizu.Log.Fatal(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -31,6 +31,10 @@ Supported protocols are HTTP and gRPC.`,
|
||||
return errors.New("unexpected number of arguments")
|
||||
}
|
||||
|
||||
if err := mizu.Config.Validate(); err != nil {
|
||||
return errormessage.FormatError(err)
|
||||
}
|
||||
|
||||
if err := mizu.Config.Tap.Validate(); err != nil {
|
||||
return errormessage.FormatError(err)
|
||||
}
|
||||
@@ -56,15 +60,14 @@ func init() {
|
||||
defaults.Set(&defaultTapConfig)
|
||||
|
||||
tapCmd.Flags().Uint16P(configStructs.GuiPortTapName, "p", defaultTapConfig.GuiPort, "Provide a custom port for the web interface webserver")
|
||||
tapCmd.Flags().StringP(configStructs.NamespaceTapName, "n", defaultTapConfig.Namespace, "Namespace selector")
|
||||
tapCmd.Flags().StringArrayP(configStructs.NamespacesTapName, "n", defaultTapConfig.Namespaces, "Namespaces selector")
|
||||
tapCmd.Flags().Bool(configStructs.AnalysisTapName, defaultTapConfig.Analysis, "Uploads traffic to UP9 for further analysis (Beta)")
|
||||
tapCmd.Flags().BoolP(configStructs.AllNamespacesTapName, "A", defaultTapConfig.AllNamespaces, "Tap all namespaces")
|
||||
tapCmd.Flags().StringP(configStructs.KubeConfigPathTapName, "k", defaultTapConfig.KubeConfigPath, "Path to kube-config file")
|
||||
tapCmd.Flags().StringArrayP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
|
||||
tapCmd.Flags().Bool(configStructs.HideHealthChecksTapName, defaultTapConfig.HideHealthChecks, "hides requests with kube-probe or prometheus user-agent headers")
|
||||
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
|
||||
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "override the default max entries db size of 200mb")
|
||||
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
|
||||
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, "", "Yaml file with policy rules")
|
||||
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
|
||||
}
|
||||
|
||||
@@ -5,11 +5,16 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/goUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu/configStructs"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -22,7 +27,6 @@ import (
|
||||
yaml "gopkg.in/yaml.v3"
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -54,40 +58,34 @@ func RunMizuTap() {
|
||||
}
|
||||
}
|
||||
|
||||
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.Tap.KubeConfigPath)
|
||||
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.KubeConfigPath)
|
||||
if err != nil {
|
||||
if clientcmd.IsEmptyConfig(err) {
|
||||
mizu.Log.Errorf(uiUtils.Error, "Couldn't find the kube config file, or file is empty. Try adding '--kube-config=<path to kube config file>'\n")
|
||||
return
|
||||
}
|
||||
if clientcmd.IsConfigurationInvalid(err) {
|
||||
mizu.Log.Errorf(uiUtils.Error, "Invalid kube config file. Try using a different config with '--kube-config=<path to kube config file>'\n")
|
||||
return
|
||||
}
|
||||
mizu.Log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
defer cleanUpMizuResources(kubernetesProvider)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
targetNamespace := getNamespace(kubernetesProvider)
|
||||
targetNamespaces := getNamespaces(kubernetesProvider)
|
||||
|
||||
var namespacesStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
|
||||
if targetNamespaces[0] != mizu.K8sAllNamespaces {
|
||||
namespacesStr = fmt.Sprintf("namespaces \"%s\"", strings.Join(targetNamespaces, "\", \""))
|
||||
} else {
|
||||
namespacesStr = "all namespaces"
|
||||
}
|
||||
mizu.CheckNewerVersion()
|
||||
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
|
||||
|
||||
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
||||
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
if len(state.currentlyTappedPods) == 0 {
|
||||
var suggestionStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
if targetNamespaces[0] != mizu.K8sAllNamespaces {
|
||||
suggestionStr = ". Select a different namespace with -n or tap all namespaces with -A"
|
||||
}
|
||||
mizu.Log.Warningf(uiUtils.Warning, fmt.Sprintf("Did not find any pods matching the regex argument%s", suggestionStr))
|
||||
@@ -99,13 +97,14 @@ func RunMizuTap() {
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
|
||||
defer cleanUpMizuResources(kubernetesProvider)
|
||||
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
go createProxyToApiServerPod(ctx, kubernetesProvider, cancel)
|
||||
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(createProxyToApiServerPod, ctx, kubernetesProvider, cancel)
|
||||
go goUtils.HandleExcWrapper(watchPodsForTapping, ctx, kubernetesProvider, targetNamespaces, cancel)
|
||||
|
||||
//block until exit signal or error
|
||||
waitForFinish(ctx, cancel)
|
||||
@@ -121,7 +120,7 @@ func readValidationRules(file string) (string, error) {
|
||||
}
|
||||
|
||||
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, mizuApiFilteringOptions *shared.TrafficFilteringOptions, mizuValidationRules string) error {
|
||||
if mizu.Config.IsOwnNamespace() {
|
||||
if !mizu.Config.IsNsRestrictedMode() {
|
||||
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -146,12 +145,12 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
|
||||
func createMizuConfigmap(ctx context.Context, kubernetesProvider *kubernetes.Provider, data string) error {
|
||||
err := kubernetesProvider.CreateConfigMap(ctx, mizu.Config.ResourcesNamespace(), mizu.ConfigMapName, data)
|
||||
err := kubernetesProvider.CreateConfigMap(ctx, mizu.Config.MizuResourcesNamespace, mizu.ConfigMapName, data)
|
||||
return err
|
||||
}
|
||||
|
||||
func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
|
||||
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.Config.ResourcesNamespace())
|
||||
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.Config.MizuResourcesNamespace)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -171,13 +170,13 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
|
||||
opts := &kubernetes.ApiServerOptions{
|
||||
Namespace: mizu.Config.ResourcesNamespace(),
|
||||
PodName: mizu.ApiServerPodName,
|
||||
PodImage: mizu.Config.MizuImage,
|
||||
ServiceAccountName: serviceAccountName,
|
||||
IsNamespaceRestricted: !mizu.Config.IsOwnNamespace(),
|
||||
Namespace: mizu.Config.MizuResourcesNamespace,
|
||||
PodName: mizu.ApiServerPodName,
|
||||
PodImage: mizu.Config.AgentImage,
|
||||
ServiceAccountName: serviceAccountName,
|
||||
IsNamespaceRestricted: mizu.Config.IsNsRestrictedMode(),
|
||||
MizuApiFilteringOptions: mizuApiFilteringOptions,
|
||||
MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(),
|
||||
MaxEntriesDBSizeBytes: mizu.Config.Tap.MaxEntriesDBSizeBytes(),
|
||||
}
|
||||
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts)
|
||||
if err != nil {
|
||||
@@ -185,7 +184,7 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
|
||||
|
||||
state.apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName, mizu.ApiServerPodName)
|
||||
state.apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -222,9 +221,9 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
|
||||
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
|
||||
ctx,
|
||||
mizu.Config.ResourcesNamespace(),
|
||||
mizu.Config.MizuResourcesNamespace,
|
||||
mizu.TapperDaemonSetName,
|
||||
mizu.Config.MizuImage,
|
||||
mizu.Config.AgentImage,
|
||||
mizu.TapperPodName,
|
||||
fmt.Sprintf("%s.%s.svc.cluster.local", state.apiServerService.Name, state.apiServerService.Namespace),
|
||||
nodeToTappedPodIPMap,
|
||||
@@ -235,7 +234,7 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
}
|
||||
mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
|
||||
} else {
|
||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.Config.ResourcesNamespace(), mizu.TapperDaemonSetName); err != nil {
|
||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -244,60 +243,69 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
}
|
||||
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
mizu.Log.Infof("\nRemoving mizu resources\n")
|
||||
|
||||
removalCtx, cancel := context.WithTimeout(context.Background(), cleanupTimeout)
|
||||
defer cancel()
|
||||
|
||||
if mizu.Config.IsOwnNamespace() {
|
||||
if err := kubernetesProvider.RemoveNamespace(removalCtx, mizu.Config.ResourcesNamespace()); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if mizu.Config.DumpLogs {
|
||||
mizuDir := mizu.GetMizuFolderPath()
|
||||
filePath = path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
||||
mizu.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
mizu.Log.Infof("\nRemoving mizu resources\n")
|
||||
|
||||
if !mizu.Config.IsNsRestrictedMode() {
|
||||
if err := kubernetesProvider.RemoveNamespace(removalCtx, mizu.Config.MizuResourcesNamespace); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Namespace %s: %v", mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := kubernetesProvider.RemovePod(removalCtx, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemovePod(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Pod %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveService(removalCtx, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveService(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service %s in namespace %s: %v", mizu.ApiServerPodName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.Config.ResourcesNamespace(), mizu.TapperDaemonSetName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing DaemonSet %s in namespace %s: %v", mizu.TapperDaemonSetName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if !state.doNotRemoveConfigMap {
|
||||
if err := kubernetesProvider.RemoveConfigMap(removalCtx, mizu.Config.ResourcesNamespace(), mizu.ConfigMapName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveConfigMap(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ConfigMapName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing ConfigMap %s in namespace %s: %v", mizu.ConfigMapName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if state.mizuServiceAccountExists {
|
||||
if mizu.Config.IsOwnNamespace() {
|
||||
if !mizu.Config.IsNsRestrictedMode() {
|
||||
if err := kubernetesProvider.RemoveNonNamespacedResources(removalCtx, mizu.ClusterRoleName, mizu.ClusterRoleBindingName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing non-namespaced resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := kubernetesProvider.RemoveServicAccount(removalCtx, mizu.Config.ResourcesNamespace(), mizu.ServiceAccountName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveServicAccount(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Service Account %s in namespace %s: %v", mizu.ServiceAccountName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
return
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRole(removalCtx, mizu.Config.ResourcesNamespace(), mizu.RoleName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveRole(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.RoleName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing Role %s in namespace %s: %v", mizu.RoleName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
|
||||
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, mizu.Config.ResourcesNamespace(), mizu.RoleBindingName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
if err := kubernetesProvider.RemoveRoleBinding(removalCtx, mizu.Config.MizuResourcesNamespace, mizu.RoleBindingName); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error removing RoleBinding %s in namespace %s: %v", mizu.RoleBindingName, mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if mizu.Config.IsOwnNamespace() {
|
||||
if !mizu.Config.IsNsRestrictedMode() {
|
||||
waitUntilNamespaceDeleted(removalCtx, cancel, kubernetesProvider)
|
||||
}
|
||||
}
|
||||
@@ -308,14 +316,14 @@ func waitUntilNamespaceDeleted(ctx context.Context, cancel context.CancelFunc, k
|
||||
waitForFinish(ctx, cancel)
|
||||
}()
|
||||
|
||||
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, mizu.Config.ResourcesNamespace()); err != nil {
|
||||
if err := kubernetesProvider.WaitUtilNamespaceDeleted(ctx, mizu.Config.MizuResourcesNamespace); err != nil {
|
||||
switch {
|
||||
case ctx.Err() == context.Canceled:
|
||||
// Do nothing. User interrupted the wait.
|
||||
case err == wait.ErrWaitTimeout:
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", mizu.Config.ResourcesNamespace()))
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Timeout while removing Namespace %s", mizu.Config.MizuResourcesNamespace))
|
||||
default:
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", mizu.Config.ResourcesNamespace(), errormessage.FormatError(err)))
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error while waiting for Namespace %s to be deleted: %v", mizu.Config.MizuResourcesNamespace, errormessage.FormatError(err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -343,14 +351,13 @@ func reportTappedPods() {
|
||||
}
|
||||
}
|
||||
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
targetNamespace := getNamespace(kubernetesProvider)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, targetNamespaces []string, cancel context.CancelFunc) {
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, targetNamespaces, mizu.Config.Tap.PodRegex())
|
||||
|
||||
restartTappers := func() {
|
||||
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace)
|
||||
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Failed to update currently tapped pods: %v", err))
|
||||
cancel()
|
||||
}
|
||||
|
||||
@@ -393,22 +400,27 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
|
||||
case <-errorChan:
|
||||
case err := <-errorChan:
|
||||
mizu.Log.Debugf("Watching pods loop, got error %v, stopping `restart tappers debouncer`", err)
|
||||
restartTappersDebouncer.Cancel()
|
||||
// TODO: Does this also perform cleanup?
|
||||
cancel()
|
||||
|
||||
case <-ctx.Done():
|
||||
mizu.Log.Debugf("Watching pods loop, context done, stopping `restart tappers debouncer`")
|
||||
restartTappersDebouncer.Cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) {
|
||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespaces []string) (error, bool) {
|
||||
changeFound := false
|
||||
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
|
||||
if matchingPods, err := kubernetesProvider.ListAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespaces); err != nil {
|
||||
return err, false
|
||||
} else {
|
||||
addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, matchingPods)
|
||||
podsToTap := excludeMizuPods(matchingPods)
|
||||
addedPods, removedPods := getPodArrayDiff(state.currentlyTappedPods, podsToTap)
|
||||
for _, addedPod := range addedPods {
|
||||
changeFound = true
|
||||
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
||||
@@ -417,12 +429,25 @@ func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx cont
|
||||
changeFound = true
|
||||
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
||||
}
|
||||
state.currentlyTappedPods = matchingPods
|
||||
state.currentlyTappedPods = podsToTap
|
||||
}
|
||||
|
||||
return nil, changeFound
|
||||
}
|
||||
|
||||
func excludeMizuPods(pods []core.Pod) []core.Pod {
|
||||
mizuPrefixRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
|
||||
|
||||
nonMizuPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods {
|
||||
if !mizuPrefixRegex.MatchString(pod.Name) {
|
||||
nonMizuPods = append(nonMizuPods, pod)
|
||||
}
|
||||
}
|
||||
|
||||
return nonMizuPods
|
||||
}
|
||||
|
||||
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
||||
added = getMissingPods(newPods, oldPods)
|
||||
removed = getMissingPods(oldPods, newPods)
|
||||
@@ -450,28 +475,34 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
|
||||
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.Config.ResourcesNamespace()), podExactRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider, []string{mizu.Config.MizuResourcesNamespace}, podExactRegex)
|
||||
isPodReady := false
|
||||
timeAfter := time.After(25 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
mizu.Log.Debugf("Watching API Server pod loop, ctx done")
|
||||
return
|
||||
case <-added:
|
||||
mizu.Log.Debugf("Got agent pod added event")
|
||||
mizu.Log.Debugf("Watching API Server pod loop, added")
|
||||
continue
|
||||
case <-removed:
|
||||
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <-modified:
|
||||
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
|
||||
if modifiedPod == nil {
|
||||
mizu.Log.Debugf("Watching API Server pod loop, modifiedPod with nil")
|
||||
continue
|
||||
}
|
||||
mizu.Log.Debugf("Watching API Server pod loop, modified: %v", modifiedPod.Status.Phase)
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go func() {
|
||||
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName)
|
||||
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v", errormessage.FormatError(err)))
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error occured while running k8s proxy %v\n"+
|
||||
"Try setting different port by using --%s", errormessage.FormatError(err), configStructs.GuiPortTapName))
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
@@ -484,11 +515,11 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
|
||||
}
|
||||
case <-timeAfter:
|
||||
if !isPodReady {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("%s pod was not ready in time", mizu.ApiServerPodName))
|
||||
mizu.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
||||
cancel()
|
||||
}
|
||||
case <-errorChan:
|
||||
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.Config.ResourcesNamespace())
|
||||
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.Config.MizuResourcesNamespace)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
@@ -517,21 +548,15 @@ func requestForAnalysis() {
|
||||
}
|
||||
|
||||
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) (bool, error) {
|
||||
mizuRBACExists, err := kubernetesProvider.DoesServiceAccountExist(ctx, mizu.Config.ResourcesNamespace(), mizu.ServiceAccountName)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !mizuRBACExists {
|
||||
if mizu.Config.IsOwnNamespace() {
|
||||
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.Config.ResourcesNamespace(), mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, mizu.Config.ResourcesNamespace(), mizu.ServiceAccountName, mizu.RoleName, mizu.RoleBindingName, mizu.RBACVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !mizu.Config.IsNsRestrictedMode() {
|
||||
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
} else {
|
||||
err := kubernetesProvider.CreateMizuRBACNamespaceRestricted(ctx, mizu.Config.MizuResourcesNamespace, mizu.ServiceAccountName, mizu.RoleName, mizu.RoleBindingName, mizu.RBACVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
@@ -563,12 +588,12 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
}
|
||||
}
|
||||
|
||||
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
|
||||
func getNamespaces(kubernetesProvider *kubernetes.Provider) []string {
|
||||
if mizu.Config.Tap.AllNamespaces {
|
||||
return mizu.K8sAllNamespaces
|
||||
} else if len(mizu.Config.Tap.Namespace) > 0 {
|
||||
return mizu.Config.Tap.Namespace
|
||||
return []string{mizu.K8sAllNamespaces}
|
||||
} else if len(mizu.Config.Tap.Namespaces) > 0 {
|
||||
return mizu.Config.Tap.Namespaces
|
||||
} else {
|
||||
return kubernetesProvider.CurrentNamespace()
|
||||
return []string{kubernetesProvider.CurrentNamespace()}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,31 +3,22 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func runMizuView() {
|
||||
kubernetesProvider, err := kubernetes.NewProvider(mizu.Config.View.KubeConfigPath)
|
||||
if err != nil {
|
||||
if clientcmd.IsEmptyConfig(err) {
|
||||
mizu.Log.Infof("Couldn't find the kube config file, or file is empty. Try adding '--kube-config=<path to kube config file>'")
|
||||
return
|
||||
}
|
||||
if clientcmd.IsConfigurationInvalid(err) {
|
||||
mizu.Log.Infof(uiUtils.Red, "Invalid kube config file. Try using a different config with '--kube-config=<path to kube config file>'")
|
||||
return
|
||||
}
|
||||
mizu.Log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
exists, err := kubernetesProvider.DoesServicesExist(ctx, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName)
|
||||
exists, err := kubernetesProvider.DoesServicesExist(ctx, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -45,8 +36,8 @@ func runMizuView() {
|
||||
mizu.Log.Infof("Found service %s, creating k8s proxy", mizu.ApiServerPodName)
|
||||
|
||||
mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.View.GuiPort))
|
||||
err = kubernetes.StartProxy(kubernetesProvider, mizu.Config.View.GuiPort, mizu.Config.ResourcesNamespace(), mizu.ApiServerPodName)
|
||||
err = kubernetes.StartProxy(kubernetesProvider, mizu.Config.View.GuiPort, mizu.Config.MizuResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("Error occured while running k8s proxy %v", err)
|
||||
mizu.Log.Errorf("Error occurred while running k8s proxy %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,9 @@ package errormessage
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
|
||||
regexpsyntax "regexp/syntax"
|
||||
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -13,9 +16,11 @@ import (
|
||||
func FormatError(err error) error {
|
||||
var errorNew error
|
||||
if k8serrors.IsForbidden(err) {
|
||||
errorNew = fmt.Errorf("Insufficient permissions: %w. Supply the required permission or control Mizu's access to namespaces by setting MizuNamespace in the config file or setting the tapped namespace with --set mizu-namespace=<NAMEPSACE>.", err)
|
||||
errorNew = fmt.Errorf("insufficient permissions: %w. "+
|
||||
"supply the required permission or control Mizu's access to namespaces by setting MizuResourcesNamespace "+
|
||||
"in the config file or setting the tapped namespace with --%s %s=<NAMEPSACE>", err, mizu.SetCommandName, mizu.MizuResourcesNamespaceConfigName)
|
||||
} else if syntaxError, isSyntaxError := asRegexSyntaxError(err); isSyntaxError {
|
||||
errorNew = fmt.Errorf("Regex %s is invalid: %w", syntaxError.Expr, err)
|
||||
errorNew = fmt.Errorf("regex %s is invalid: %w", syntaxError.Expr, err)
|
||||
} else {
|
||||
errorNew = err
|
||||
}
|
||||
|
||||
26
cli/fsUtils/dirUtils.go
Normal file
26
cli/fsUtils/dirUtils.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
func EnsureDir(dirName string) error {
|
||||
err := os.Mkdir(dirName, 0700)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if os.IsExist(err) {
|
||||
// check that the existing path is a directory
|
||||
info, err := os.Stat(dirName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return errors.New(fmt.Sprintf("path exists but is not a directory: %s", dirName))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
58
cli/fsUtils/mizuLogsUtils.go
Normal file
58
cli/fsUtils/mizuLogsUtils.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"os"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error {
|
||||
podExactRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
|
||||
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{mizu.Config.MizuResourcesNamespace})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(pods) == 0 {
|
||||
return fmt.Errorf("no mizu pods found in namespace %s", mizu.Config.MizuResourcesNamespace)
|
||||
}
|
||||
|
||||
newZipFile, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer newZipFile.Close()
|
||||
zipWriter := zip.NewWriter(newZipFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
for _, pod := range pods {
|
||||
logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Failed to get logs, %v", err)
|
||||
continue
|
||||
} else {
|
||||
mizu.Log.Debugf("Successfully read log length %d for pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
if err := AddStrToZip(zipWriter, logs, fmt.Sprintf("%s.%s.log", pod.Namespace, pod.Name)); err != nil {
|
||||
mizu.Log.Errorf("Failed write logs, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added log length %d from pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
}
|
||||
if err := AddFileToZip(zipWriter, mizu.GetConfigFilePath()); err != nil {
|
||||
mizu.Log.Debugf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetConfigFilePath())
|
||||
}
|
||||
if err := AddFileToZip(zipWriter, mizu.GetLogFilePath()); err != nil {
|
||||
mizu.Log.Debugf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetLogFilePath())
|
||||
}
|
||||
mizu.Log.Infof("You can find the zip with all logs in %s\n", filePath)
|
||||
return nil
|
||||
}
|
||||
55
cli/fsUtils/zipUtils.go
Normal file
55
cli/fsUtils/zipUtils.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func AddFileToZip(zipWriter *zip.Writer, filename string) error {
|
||||
|
||||
fileToZip, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s, %w", filename, err)
|
||||
}
|
||||
defer fileToZip.Close()
|
||||
|
||||
// Get the file information
|
||||
info, err := fileToZip.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get file information %s, %w", filename, err)
|
||||
}
|
||||
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Using FileInfoHeader() above only uses the basename of the file. If we want
|
||||
// to preserve the folder structure we can overwrite this with the full path.
|
||||
header.Name = filepath.Base(filename)
|
||||
|
||||
// Change to deflate to gain better compression
|
||||
// see http://golang.org/pkg/archive/zip/#pkg-constants
|
||||
header.Method = zip.Deflate
|
||||
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create header in zip for %s, %w", filename, err)
|
||||
}
|
||||
_, err = io.Copy(writer, fileToZip)
|
||||
return err
|
||||
}
|
||||
|
||||
func AddStrToZip(writer *zip.Writer, logs string, fileName string) error {
|
||||
if zipFile, err := writer.Create(fileName); err != nil {
|
||||
return fmt.Errorf("couldn't create a log file inside zip for %s, %w", fileName, err)
|
||||
} else {
|
||||
if _, err = zipFile.Write([]byte(logs)); err != nil {
|
||||
return fmt.Errorf("couldn't write logs to zip file: %s, %w", fileName, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
25
cli/goUtils/funcWrappers.go
Normal file
25
cli/goUtils/funcWrappers.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package goUtils
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
func HandleExcWrapper(fn interface{}, params ...interface{}) (result []reflect.Value) {
|
||||
defer func() {
|
||||
if panicMessage := recover(); panicMessage != nil {
|
||||
stack := debug.Stack()
|
||||
mizu.Log.Fatalf("Unhandled panic: %v\n stack: %s", panicMessage, stack)
|
||||
}
|
||||
}()
|
||||
f := reflect.ValueOf(fn)
|
||||
if f.Type().NumIn() != len(params) {
|
||||
panic("incorrect number of parameters!")
|
||||
}
|
||||
inputs := make([]reflect.Value, len(params))
|
||||
for k, in := range params {
|
||||
inputs[k] = reflect.ValueOf(in)
|
||||
}
|
||||
return f.Call(inputs)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
_ "bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"io"
|
||||
core "k8s.io/api/core/v1"
|
||||
rbac "k8s.io/api/rbac/v1"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -52,7 +54,12 @@ func NewProvider(kubeConfigPath string) (*Provider, error) {
|
||||
kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath)
|
||||
restClientConfig, err := kubernetesConfig.ClientConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if clientcmd.IsEmptyConfig(err) {
|
||||
return nil, fmt.Errorf("Couldn't find the kube config file, or file is empty. Try adding '--kube-config=<path to kube config file>'\n")
|
||||
}
|
||||
if clientcmd.IsConfigurationInvalid(err) {
|
||||
return nil, fmt.Errorf("Invalid kube config file. Try using a different config with '--kube-config=<path to kube config file>'\n")
|
||||
}
|
||||
}
|
||||
clientSet := getClientSet(restClientConfig)
|
||||
|
||||
@@ -350,15 +357,15 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string,
|
||||
},
|
||||
}
|
||||
_, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Create(ctx, serviceAccount, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = provider.clientSet.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = provider.clientSet.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -404,15 +411,15 @@ func (provider *Provider) CreateMizuRBACNamespaceRestricted(ctx context.Context,
|
||||
},
|
||||
}
|
||||
_, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Create(ctx, serviceAccount, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = provider.clientSet.RbacV1().Roles(namespace).Create(ctx, role, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = provider.clientSet.RbacV1().RoleBindings(namespace).Create(ctx, roleBinding, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
if err != nil && !k8serrors.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -551,7 +558,6 @@ func (provider *Provider) CreateConfigMap(ctx context.Context, namespace string,
|
||||
if _, err := provider.clientSet.CoreV1().ConfigMaps(namespace).Create(ctx, configMap, metav1.CreateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -671,20 +677,57 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
|
||||
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
|
||||
var pods []core.Pod
|
||||
for _, namespace := range namespaces {
|
||||
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get pods in ns: [%s], %w", namespace, err)
|
||||
}
|
||||
|
||||
pods = append(pods, namespacePods.Items...)
|
||||
}
|
||||
|
||||
matchingPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods.Items {
|
||||
if regex.MatchString(pod.Name) && isPodRunning(&pod) {
|
||||
for _, pod := range pods {
|
||||
if regex.MatchString(pod.Name) {
|
||||
matchingPods = append(matchingPods, pod)
|
||||
}
|
||||
}
|
||||
return matchingPods, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) ListAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespaces []string) ([]core.Pod, error) {
|
||||
pods, err := provider.ListAllPodsMatchingRegex(ctx, regex, namespaces)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
matchingPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods {
|
||||
if isPodRunning(&pod) {
|
||||
matchingPods = append(matchingPods, pod)
|
||||
}
|
||||
}
|
||||
return matchingPods, nil
|
||||
}
|
||||
|
||||
func (provider *Provider) GetPodLogs(namespace string, podName string, ctx context.Context) (string, error) {
|
||||
podLogOpts := core.PodLogOptions{}
|
||||
req := provider.clientSet.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
|
||||
podLogs, err := req.Stream(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error opening log stream on ns: %s, pod: %s, %w", namespace, podName, err)
|
||||
}
|
||||
defer podLogs.Close()
|
||||
buf := new(bytes.Buffer)
|
||||
if _, err = io.Copy(buf, podLogs); err != nil {
|
||||
return "", fmt.Errorf("error copy information from podLogs to buf, ns: %s, pod: %s, %w", namespace, podName, err)
|
||||
}
|
||||
str := buf.String()
|
||||
return str, nil
|
||||
}
|
||||
|
||||
func getClientSet(config *restclient.Config) *kubernetes.Clientset {
|
||||
clientSet, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
|
||||
@@ -4,49 +4,64 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"regexp"
|
||||
"sync"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
func FilteredWatch(ctx context.Context, watcher watch.Interface, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
|
||||
func FilteredWatch(ctx context.Context, kubernetesProvider *Provider, targetNamespaces []string, podFilter *regexp.Regexp) (chan *corev1.Pod, chan *corev1.Pod, chan *corev1.Pod, chan error) {
|
||||
addedChan := make(chan *corev1.Pod)
|
||||
modifiedChan := make(chan *corev1.Pod)
|
||||
removedChan := make(chan *corev1.Pod)
|
||||
errorChan := make(chan error)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
|
||||
if e.Object == nil {
|
||||
errorChan <- errors.New("kubernetes pod watch failed")
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, targetNamespace := range targetNamespaces {
|
||||
wg.Add(1)
|
||||
|
||||
go func(targetNamespace string) {
|
||||
defer wg.Done()
|
||||
watcher := kubernetesProvider.GetPodWatcher(ctx, targetNamespace)
|
||||
|
||||
for {
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
if e.Object == nil {
|
||||
errorChan <- errors.New("kubernetes pod watch failed")
|
||||
return
|
||||
}
|
||||
|
||||
pod := e.Object.(*corev1.Pod)
|
||||
|
||||
if !podFilter.MatchString(pod.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case watch.Added:
|
||||
addedChan <- pod
|
||||
case watch.Modified:
|
||||
modifiedChan <- pod
|
||||
case watch.Deleted:
|
||||
removedChan <- pod
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return
|
||||
}
|
||||
|
||||
pod := e.Object.(*corev1.Pod)
|
||||
|
||||
if !podFilter.MatchString(pod.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
switch e.Type {
|
||||
case watch.Added:
|
||||
addedChan <- pod
|
||||
case watch.Modified:
|
||||
modifiedChan <- pod
|
||||
case watch.Deleted:
|
||||
removedChan <- pod
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
close(addedChan)
|
||||
close(modifiedChan)
|
||||
close(removedChan)
|
||||
close(errorChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(targetNamespace)
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
wg.Wait()
|
||||
close(addedChan)
|
||||
close(modifiedChan)
|
||||
close(removedChan)
|
||||
close(errorChan)
|
||||
}()
|
||||
|
||||
return addedChan, modifiedChan, removedChan, errorChan
|
||||
|
||||
@@ -2,10 +2,9 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/cmd"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/goUtils"
|
||||
)
|
||||
|
||||
func main() {
|
||||
mizu.InitLogger()
|
||||
cmd.Execute()
|
||||
goUtils.HandleExcWrapper(cmd.Execute)
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/creasty/defaults"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/up9inc/mizu/cli/mizu/configStructs"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
@@ -22,16 +23,37 @@ const (
|
||||
SetCommandName = "set"
|
||||
)
|
||||
|
||||
var allowedSetFlags = []string{
|
||||
AgentImageConfigName,
|
||||
MizuResourcesNamespaceConfigName,
|
||||
TelemetryConfigName,
|
||||
DumpLogsConfigName,
|
||||
KubeConfigPathName,
|
||||
configStructs.AnalysisDestinationTapName,
|
||||
configStructs.SleepIntervalSecTapName,
|
||||
}
|
||||
|
||||
var Config = ConfigStruct{}
|
||||
|
||||
func (config *ConfigStruct) Validate() error {
|
||||
if config.IsNsRestrictedMode() {
|
||||
if config.Tap.AllNamespaces || len(config.Tap.Namespaces) != 1 || config.Tap.Namespaces[0] != config.MizuResourcesNamespace {
|
||||
return fmt.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
||||
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, MizuResourcesNamespaceConfigName)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func InitConfig(cmd *cobra.Command) error {
|
||||
if err := defaults.Set(&Config); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mergeConfigFile(); err != nil {
|
||||
Log.Errorf("Could not load config file, error %v", err)
|
||||
Log.Fatalf("You can regenerate the file using `mizu config -r` or just remove it %v", GetConfigFilePath())
|
||||
return fmt.Errorf("invalid config %w\n"+
|
||||
"you can regenerate the file using `mizu config -r` or just remove it %v", err, GetConfigFilePath())
|
||||
}
|
||||
|
||||
cmd.Flags().Visit(initFlag)
|
||||
@@ -51,7 +73,7 @@ func GetConfigWithDefaults() (string, error) {
|
||||
}
|
||||
|
||||
func GetConfigFilePath() string {
|
||||
return path.Join(getMizuFolderPath(), "config.yaml")
|
||||
return path.Join(GetMizuFolderPath(), "config.yaml")
|
||||
}
|
||||
|
||||
func mergeConfigFile() error {
|
||||
@@ -83,33 +105,34 @@ func initFlag(f *pflag.Flag) {
|
||||
}
|
||||
|
||||
if f.Name == SetCommandName {
|
||||
if setError := mergeSetFlag(sliceValue.GetSlice()); setError != nil {
|
||||
Log.Infof(uiUtils.Red, "Invalid set argument")
|
||||
}
|
||||
mergeSetFlag(sliceValue.GetSlice())
|
||||
return
|
||||
}
|
||||
|
||||
mergeFlagValues(configElem, f.Name, sliceValue.GetSlice())
|
||||
}
|
||||
|
||||
func mergeSetFlag(setValues []string) error {
|
||||
func mergeSetFlag(setValues []string) {
|
||||
configElem := reflect.ValueOf(&Config).Elem()
|
||||
|
||||
for _, setValue := range setValues {
|
||||
if !strings.Contains(setValue, Separator) {
|
||||
return errors.New(fmt.Sprintf("invalid set argument %s", setValue))
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s (set argument format: <flag name>=<flag value>)", setValue))
|
||||
}
|
||||
|
||||
split := strings.SplitN(setValue, Separator, 2)
|
||||
if len(split) != 2 {
|
||||
return errors.New(fmt.Sprintf("invalid set argument %s", setValue))
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s (set argument format: <flag name>=<flag value>)", setValue))
|
||||
}
|
||||
|
||||
argumentKey, argumentValue := split[0], split[1]
|
||||
|
||||
if !Contains(allowedSetFlags, argumentKey) {
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s, flag name must be one of the following: \"%s\"", setValue, strings.Join(allowedSetFlags, "\", \"")))
|
||||
}
|
||||
|
||||
mergeFlagValue(configElem, argumentKey, argumentValue)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeFlagValue(currentElem reflect.Value, flagKey string, flagValue string) {
|
||||
@@ -130,7 +153,7 @@ func mergeFlagValue(currentElem reflect.Value, flagKey string, flagValue string)
|
||||
|
||||
parsedValue, err := getParsedValue(flagValueKind, flagValue)
|
||||
if err != nil {
|
||||
Log.Warningf(uiUtils.Red, fmt.Sprintf("Invalid value %v for key %s, expected %s", flagValue, flagKey, flagValueKind))
|
||||
Log.Warningf(uiUtils.Red, fmt.Sprintf("Invalid value %v for flag name %s, expected %s", flagValue, flagKey, flagValueKind))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -158,7 +181,7 @@ func mergeFlagValues(currentElem reflect.Value, flagKey string, flagValues []str
|
||||
for _, flagValue := range flagValues {
|
||||
parsedValue, err := getParsedValue(flagValueKind, flagValue)
|
||||
if err != nil {
|
||||
Log.Warningf(uiUtils.Red, fmt.Sprintf("Invalid value %v for key %s, expected %s", flagValue, flagKey, flagValueKind))
|
||||
Log.Warningf(uiUtils.Red, fmt.Sprintf("Invalid value %v for flag name %s, expected %s", flagValue, flagKey, flagValueKind))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -180,14 +203,70 @@ func getParsedValue(kind reflect.Kind, value string) (reflect.Value, error) {
|
||||
}
|
||||
|
||||
return reflect.ValueOf(boolArgumentValue), nil
|
||||
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
case reflect.Int:
|
||||
intArgumentValue, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(int(intArgumentValue)), nil
|
||||
case reflect.Int8:
|
||||
intArgumentValue, err := strconv.ParseInt(value, 10, 8)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(int8(intArgumentValue)), nil
|
||||
case reflect.Int16:
|
||||
intArgumentValue, err := strconv.ParseInt(value, 10, 16)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(int16(intArgumentValue)), nil
|
||||
case reflect.Int32:
|
||||
intArgumentValue, err := strconv.ParseInt(value, 10, 32)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(int32(intArgumentValue)), nil
|
||||
case reflect.Int64:
|
||||
intArgumentValue, err := strconv.ParseInt(value, 10, 64)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(intArgumentValue), nil
|
||||
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||
case reflect.Uint:
|
||||
uintArgumentValue, err := strconv.ParseUint(value, 10, 64)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(uint(uintArgumentValue)), nil
|
||||
case reflect.Uint8:
|
||||
uintArgumentValue, err := strconv.ParseUint(value, 10, 8)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(uint8(uintArgumentValue)), nil
|
||||
case reflect.Uint16:
|
||||
uintArgumentValue, err := strconv.ParseUint(value, 10, 16)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(uint16(uintArgumentValue)), nil
|
||||
case reflect.Uint32:
|
||||
uintArgumentValue, err := strconv.ParseUint(value, 10, 32)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
return reflect.ValueOf(uint32(uintArgumentValue)), nil
|
||||
case reflect.Uint64:
|
||||
uintArgumentValue, err := strconv.ParseUint(value, 10, 64)
|
||||
if err != nil {
|
||||
break
|
||||
|
||||
@@ -6,32 +6,30 @@ import (
|
||||
"github.com/up9inc/mizu/cli/mizu/configStructs"
|
||||
)
|
||||
|
||||
const (
|
||||
AgentImageConfigName = "agent-image"
|
||||
MizuResourcesNamespaceConfigName = "mizu-resources-namespace"
|
||||
TelemetryConfigName = "telemetry"
|
||||
DumpLogsConfigName = "dump-logs"
|
||||
KubeConfigPathName = "kube-config-path"
|
||||
)
|
||||
|
||||
type ConfigStruct struct {
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Fetch configStructs.FetchConfig `yaml:"fetch"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
MizuImage string `yaml:"mizu-image"`
|
||||
MizuNamespace string `yaml:"mizu-namespace"`
|
||||
Telemetry bool `yaml:"telemetry" default:"true"`
|
||||
Tap configStructs.TapConfig `yaml:"tap"`
|
||||
Fetch configStructs.FetchConfig `yaml:"fetch"`
|
||||
Version configStructs.VersionConfig `yaml:"version"`
|
||||
View configStructs.ViewConfig `yaml:"view"`
|
||||
AgentImage string `yaml:"agent-image"`
|
||||
MizuResourcesNamespace string `yaml:"mizu-resources-namespace" default:"mizu"`
|
||||
Telemetry bool `yaml:"telemetry" default:"true"`
|
||||
DumpLogs bool `yaml:"dump-logs" default:"false"`
|
||||
KubeConfigPath string `yaml:"kube-config-path" default:""`
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) SetDefaults() {
|
||||
config.MizuImage = fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:%s", Branch, SemVer)
|
||||
config.AgentImage = fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:%s", Branch, SemVer)
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) ResourcesNamespace() string {
|
||||
if config.MizuNamespace == "" {
|
||||
return ResourcesDefaultNamespace
|
||||
}
|
||||
|
||||
return config.MizuNamespace
|
||||
}
|
||||
|
||||
func (config *ConfigStruct) IsOwnNamespace() bool {
|
||||
if config.MizuNamespace == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
func (config *ConfigStruct) IsNsRestrictedMode() bool {
|
||||
return config.MizuResourcesNamespace != "mizu" // Notice "mizu" string must match the default MizuResourcesNamespace
|
||||
}
|
||||
|
||||
@@ -10,11 +10,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
AnalysisDestinationTapName = "dest"
|
||||
SleepIntervalSecTapName = "upload-interval"
|
||||
GuiPortTapName = "gui-port"
|
||||
NamespaceTapName = "namespace"
|
||||
NamespacesTapName = "namespaces"
|
||||
AnalysisTapName = "analysis"
|
||||
AllNamespacesTapName = "all-namespaces"
|
||||
KubeConfigPathTapName = "kube-config"
|
||||
PlainTextFilterRegexesTapName = "regex-masking"
|
||||
HideHealthChecksTapName = "hide-healthchecks"
|
||||
DisableRedactionTapName = "no-redact"
|
||||
@@ -29,10 +30,9 @@ type TapConfig struct {
|
||||
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
|
||||
PodRegexStr string `yaml:"regex" default:".*"`
|
||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||
Namespace string `yaml:"namespace"`
|
||||
Namespaces []string `yaml:"namespaces"`
|
||||
Analysis bool `yaml:"analysis" default:"false"`
|
||||
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
||||
KubeConfigPath string `yaml:"kube-config"`
|
||||
PlainTextFilterRegexes []string `yaml:"regex-masking"`
|
||||
HideHealthChecks bool `yaml:"hide-healthchecks" default:"false"`
|
||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||
|
||||
@@ -14,20 +14,20 @@ var (
|
||||
)
|
||||
|
||||
const (
|
||||
ApiServerPodName = "mizu-api-server"
|
||||
ClusterRoleBindingName = "mizu-cluster-role-binding"
|
||||
ClusterRoleName = "mizu-cluster-role"
|
||||
K8sAllNamespaces = ""
|
||||
ResourcesDefaultNamespace = "mizu"
|
||||
RoleBindingName = "mizu-role-binding"
|
||||
RoleName = "mizu-role"
|
||||
ServiceAccountName = "mizu-service-account"
|
||||
TapperDaemonSetName = "mizu-tapper-daemon-set"
|
||||
TapperPodName = "mizu-tapper"
|
||||
ConfigMapName = "mizu-policy"
|
||||
MizuResourcesPrefix = "mizu-"
|
||||
ApiServerPodName = MizuResourcesPrefix + "api-server"
|
||||
ClusterRoleBindingName = MizuResourcesPrefix + "cluster-role-binding"
|
||||
ClusterRoleName = MizuResourcesPrefix + "cluster-role"
|
||||
K8sAllNamespaces = ""
|
||||
RoleBindingName = MizuResourcesPrefix + "role-binding"
|
||||
RoleName = MizuResourcesPrefix + "role"
|
||||
ServiceAccountName = MizuResourcesPrefix + "service-account"
|
||||
TapperDaemonSetName = MizuResourcesPrefix + "tapper-daemon-set"
|
||||
TapperPodName = MizuResourcesPrefix + "tapper"
|
||||
ConfigMapName = MizuResourcesPrefix + "policy"
|
||||
)
|
||||
|
||||
func getMizuFolderPath() string {
|
||||
func GetMizuFolderPath() string {
|
||||
home, homeDirErr := os.UserHomeDir()
|
||||
if homeDirErr != nil {
|
||||
return ""
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package mizu
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/op/go-logging"
|
||||
"os"
|
||||
"path"
|
||||
@@ -13,15 +12,15 @@ var format = logging.MustStringFormatter(
|
||||
`%{time} %{level:.5s} ▶ %{pid} %{shortfile} %{shortfunc} ▶ %{message}`,
|
||||
)
|
||||
|
||||
func GetLogFilePath() string {
|
||||
return path.Join(GetMizuFolderPath(), "mizu_cli.log")
|
||||
}
|
||||
|
||||
func InitLogger() {
|
||||
mizuDirPath := getMizuFolderPath()
|
||||
if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil {
|
||||
panic(fmt.Sprintf("Failed creating mizu dir: %v, err %v", mizuDirPath, err))
|
||||
}
|
||||
logPath := path.Join(mizuDirPath, "log.log")
|
||||
logPath := GetLogFilePath()
|
||||
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed mizu log file: %v, err %v", logPath, err))
|
||||
Log.Infof("Failed to open mizu log file: %v, err %v", logPath, err)
|
||||
}
|
||||
|
||||
fileLog := logging.NewLogBackend(f, "", 0)
|
||||
|
||||
11
cli/mizu/sliceUtils.go
Normal file
11
cli/mizu/sliceUtils.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package mizu
|
||||
|
||||
func Contains(slice []string, containsValue string) bool {
|
||||
for _, sliceValue := range slice {
|
||||
if sliceValue == containsValue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package debounce
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -13,9 +14,10 @@ func NewDebouncer(timeout time.Duration, callback func()) *Debouncer {
|
||||
|
||||
type Debouncer struct {
|
||||
callback func()
|
||||
running bool
|
||||
timeout time.Duration
|
||||
timer *time.Timer
|
||||
running bool
|
||||
canceled bool
|
||||
timeout time.Duration
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
func (d *Debouncer) setTimeout(timeout time.Duration) {
|
||||
@@ -25,18 +27,28 @@ func (d *Debouncer) setTimeout(timeout time.Duration) {
|
||||
|
||||
func (d *Debouncer) setCallback(callback func()) {
|
||||
callbackWrapped := func() {
|
||||
callback()
|
||||
if !d.canceled {
|
||||
callback()
|
||||
}
|
||||
d.running = false
|
||||
}
|
||||
|
||||
d.callback = callbackWrapped
|
||||
}
|
||||
|
||||
func (d *Debouncer) SetOn() {
|
||||
func (d *Debouncer) Cancel() {
|
||||
d.canceled = true
|
||||
}
|
||||
|
||||
func (d *Debouncer) SetOn() error {
|
||||
if d.canceled {
|
||||
return fmt.Errorf("debouncer cancelled")
|
||||
}
|
||||
if d.running == true {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
d.running = true
|
||||
d.timer = time.AfterFunc(d.timeout, d.callback)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user