Merge pull request #50 from replicatedhq/run-in-cli

Run in cli
This commit is contained in:
Marc Campbell
2019-08-14 20:35:17 -07:00
committed by GitHub
15 changed files with 180 additions and 469 deletions

View File

@@ -1,6 +1,7 @@
package cli
import (
"fmt"
"io/ioutil"
"github.com/replicatedhq/troubleshoot/pkg/collect"
@@ -25,14 +26,22 @@ func Run() *cobra.Command {
return err
}
collector := collect.Collector{
Spec: string(specContents),
Redact: v.GetBool("redact"),
}
if err := collector.RunCollectorSync(); err != nil {
c, err := collect.ParseSpec(string(specContents))
if err != nil {
return err
}
collector := collect.Collector{
Collect: c,
Redact: v.GetBool("redact"),
}
b, err := collector.RunCollectorSync()
if err != nil {
return err
}
fmt.Printf("%s", b)
return nil
},
}

View File

@@ -1,12 +1,9 @@
package cli
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
@@ -18,22 +15,11 @@ import (
"github.com/pkg/errors"
analyzerunner "github.com/replicatedhq/troubleshoot/pkg/analyze"
troubleshootv1beta1 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta1"
collectrunner "github.com/replicatedhq/troubleshoot/pkg/collect"
"github.com/replicatedhq/troubleshoot/pkg/collect"
"github.com/replicatedhq/troubleshoot/pkg/logger"
"github.com/spf13/viper"
"github.com/tj/go-spin"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
func runPreflightsNoCRD(v *viper.Viper, arg string) error {
@@ -143,64 +129,6 @@ func runPreflightsNoCRD(v *viper.Viper, arg string) error {
}
func runCollectors(v *viper.Viper, preflight troubleshootv1beta1.Preflight) (map[string][]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}
client, err := client.New(cfg, client.Options{})
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
restClient := clientset.CoreV1().RESTClient()
serviceAccountName := v.GetString("serviceaccount")
if serviceAccountName == "" {
generatedServiceAccountName, err := createServiceAccount(preflight, v.GetString("namespace"), clientset)
if err != nil {
return nil, err
}
defer removeServiceAccount(generatedServiceAccountName, v.GetString("namespace"), clientset)
serviceAccountName = generatedServiceAccountName
}
// deploy an object that "owns" everything to aid in cleanup
configMapNamespacedName := types.NamespacedName{
Name: fmt.Sprintf("preflight-%s-owner", preflight.Name),
Namespace: v.GetString("namespace"),
}
foundConfigMap := &corev1.ConfigMap{}
err = client.Get(context.Background(), configMapNamespacedName, foundConfigMap)
if err == nil || !kuberneteserrors.IsNotFound(err) {
return nil, errors.Wrap(err, "failed to get existing config map")
}
owner := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapNamespacedName.Name,
Namespace: configMapNamespacedName.Namespace,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
Data: make(map[string]string),
}
if err := client.Create(context.Background(), &owner); err != nil {
return nil, errors.Wrap(err, "failed to create config map")
}
defer func() {
if err := client.Delete(context.Background(), &owner); err != nil {
fmt.Println("failed to clean up preflight.")
}
}()
// deploy all collectors
desiredCollectors := make([]*troubleshootv1beta1.Collect, 0, 0)
for _, definedCollector := range preflight.Spec.Collectors {
desiredCollectors = append(desiredCollectors, definedCollector)
@@ -208,109 +136,29 @@ func runCollectors(v *viper.Viper, preflight troubleshootv1beta1.Preflight) (map
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterInfo: &troubleshootv1beta1.ClusterInfo{}})
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterResources: &troubleshootv1beta1.ClusterResources{}})
podsCreated := make([]*corev1.Pod, 0, 0)
podsDeleted := make([]*corev1.Pod, 0, 0)
allCollectedData := make(map[string][]byte)
resyncPeriod := time.Second
ctx := context.Background()
watchList := cache.NewListWatchFromClient(restClient, "pods", "", fields.Everything())
_, controller := cache.NewInformer(watchList, &corev1.Pod{}, resyncPeriod,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
return
}
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
return
}
labels := newPod.Labels
troubleshootRole, ok := labels["troubleshoot-role"]
if !ok || troubleshootRole != "preflight" {
return
}
preflightName, ok := labels["preflight"]
if !ok || preflightName != preflight.Name {
return
}
// Run preflights collectors synchronously
for _, desiredCollector := range desiredCollectors {
collector := collect.Collector{
Redact: true,
Collect: desiredCollector,
}
if oldPod.Status.Phase == newPod.Status.Phase {
return
}
if newPod.Status.Phase == corev1.PodFailed {
podsDeleted = append(podsDeleted, newPod)
return
}
if newPod.Status.Phase != corev1.PodSucceeded {
return
}
podLogOpts := corev1.PodLogOptions{}
req := clientset.CoreV1().Pods(newPod.Namespace).GetLogs(newPod.Name, &podLogOpts)
podLogs, err := req.Stream()
if err != nil {
fmt.Println("get stream")
return
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
fmt.Println("copy logs")
return
}
collectedData, err := parseCollectorOutput(buf.String())
if err != nil {
logger.Printf("parse collected data: %v\n", err)
return
}
for k, v := range collectedData {
allCollectedData[k] = v
}
if err := client.Delete(context.Background(), newPod); err != nil {
fmt.Println("delete pod")
}
podsDeleted = append(podsDeleted, newPod)
},
})
go func() {
controller.Run(ctx.Done())
}()
s := runtime.NewScheme()
s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.ConfigMap{})
for _, collector := range desiredCollectors {
_, pod, err := collectrunner.CreateCollector(client, s, &owner, preflight.Name, v.GetString("namespace"), serviceAccountName, "preflight", collector, v.GetString("image"), v.GetString("pullpolicy"))
result, err := collector.RunCollectorSync()
if err != nil {
return nil, errors.Wrap(err, "failed to create collector")
return nil, errors.Wrap(err, "failed to run collector")
}
output, err := parseCollectorOutput(string(result))
if err != nil {
return nil, errors.Wrap(err, "failed to parse collector output")
}
for k, v := range output {
allCollectedData[k] = v
}
podsCreated = append(podsCreated, pod)
}
start := time.Now()
for {
if start.Add(time.Second * 30).Before(time.Now()) {
fmt.Println("timeout running preflight")
return nil, err
}
if len(podsDeleted) == len(podsCreated) {
break
}
time.Sleep(time.Millisecond * 200)
}
ctx.Done()
return allCollectedData, nil
}

View File

@@ -1,6 +1,9 @@
package main
import "github.com/replicatedhq/troubleshoot/cmd/preflight/cli"
import (
"github.com/replicatedhq/troubleshoot/cmd/preflight/cli"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
func main() {
cli.InitAndExecute()

View File

@@ -1,12 +1,9 @@
package cli
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
@@ -16,20 +13,10 @@ import (
"github.com/ahmetalpbalkan/go-cursor"
"github.com/mholt/archiver"
troubleshootv1beta1 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta1"
collectrunner "github.com/replicatedhq/troubleshoot/pkg/collect"
"github.com/replicatedhq/troubleshoot/pkg/logger"
"github.com/replicatedhq/troubleshoot/pkg/collect"
"github.com/spf13/viper"
"github.com/tj/go-spin"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
func runTroubleshootNoCRD(v *viper.Viper, arg string) error {
@@ -119,66 +106,6 @@ the %s Admin Console to begin analysis.`
}
func runCollectors(v *viper.Viper, collector troubleshootv1beta1.Collector, progressChan chan string) (string, error) {
cfg, err := config.GetConfig()
if err != nil {
return "", err
}
client, err := client.New(cfg, client.Options{})
if err != nil {
return "", err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return "", err
}
restClient := clientset.CoreV1().RESTClient()
serviceAccountName := v.GetString("serviceaccount")
if serviceAccountName == "" {
generatedServiceAccountName, err := createServiceAccount(collector, v.GetString("namespace"), clientset)
if err != nil {
return "", err
}
defer removeServiceAccount(generatedServiceAccountName, v.GetString("namespace"), clientset)
serviceAccountName = generatedServiceAccountName
}
// deploy an object that "owns" everything to aid in cleanup
owner := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("troubleshoot-%s-owner", collector.Name),
Namespace: v.GetString("namespace"),
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
Data: make(map[string]string),
}
if err := client.Create(context.Background(), &owner); err != nil {
return "", err
}
defer func() {
if err := client.Delete(context.Background(), &owner); err != nil {
fmt.Println("failed to clean up preflight.")
}
}()
// deploy all collectors
desiredCollectors := make([]*troubleshootv1beta1.Collect, 0, 0)
for _, definedCollector := range collector.Spec {
desiredCollectors = append(desiredCollectors, definedCollector)
}
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterInfo: &troubleshootv1beta1.ClusterInfo{}})
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterResources: &troubleshootv1beta1.ClusterResources{}})
podsCreated := make([]*corev1.Pod, 0, 0)
podsDeleted := make([]*corev1.Pod, 0, 0)
collectorDirs := []string{}
bundlePath, err := ioutil.TempDir("", "troubleshoot")
if err != nil {
return "", err
@@ -190,114 +117,42 @@ func runCollectors(v *viper.Viper, collector troubleshootv1beta1.Collector, prog
return "", err
}
resyncPeriod := time.Second
ctx := context.Background()
watchList := cache.NewListWatchFromClient(restClient, "pods", "", fields.Everything())
_, controller := cache.NewInformer(watchList, &corev1.Pod{}, resyncPeriod,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
return
}
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
return
}
labels := newPod.Labels
desiredCollectors := make([]*troubleshootv1beta1.Collect, 0, 0)
for _, definedCollector := range collector.Spec {
desiredCollectors = append(desiredCollectors, definedCollector)
}
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterInfo: &troubleshootv1beta1.ClusterInfo{}})
desiredCollectors = ensureCollectorInList(desiredCollectors, troubleshootv1beta1.Collect{ClusterResources: &troubleshootv1beta1.ClusterResources{}})
troubleshootRole, ok := labels["troubleshoot-role"]
if !ok || troubleshootRole != "troubleshoot" {
return
}
collectorDirs := []string{}
collectorName, ok := labels["troubleshoot"]
if !ok || collectorName != collector.Name {
return
}
// Run preflights collectors synchronously
for _, desiredCollector := range desiredCollectors {
collector := collect.Collector{
Redact: true,
Collect: desiredCollector,
}
if oldPod.Status.Phase == newPod.Status.Phase {
return
}
if newPod.Status.Phase == corev1.PodFailed {
podsDeleted = append(podsDeleted, newPod)
return
}
if newPod.Status.Phase != corev1.PodSucceeded {
return
}
podLogOpts := corev1.PodLogOptions{}
req := clientset.CoreV1().Pods(newPod.Namespace).GetLogs(newPod.Name, &podLogOpts)
podLogs, err := req.Stream()
if err != nil {
fmt.Println("get stream")
return
}
defer podLogs.Close()
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
fmt.Println("copy logs")
return
}
collectorDir, err := parseAndSaveCollectorOutput(buf.String(), bundlePath)
if err != nil {
logger.Printf("parse collected data: %v\n", err)
return
}
// empty dir name will make tar fail
if collectorDir == "" {
logger.Printf("pod %s did not return any files\n", newPod.Name)
return
}
progressChan <- collectorDir
collectorDirs = append(collectorDirs, collectorDir)
if err := client.Delete(context.Background(), newPod); err != nil {
fmt.Println("delete pod error", err)
}
podsDeleted = append(podsDeleted, newPod)
},
})
go func() {
controller.Run(ctx.Done())
}()
s := runtime.NewScheme()
s.AddKnownTypes(schema.GroupVersion{Group: "", Version: "v1"}, &corev1.ConfigMap{})
for _, collect := range desiredCollectors {
_, pod, err := collectrunner.CreateCollector(client, s, &owner, collector.Name, v.GetString("namespace"), serviceAccountName, "troubleshoot", collect, v.GetString("image"), v.GetString("pullpolicy"))
result, err := collector.RunCollectorSync()
if err != nil {
logger.Printf("A collector pod cannot be created: %v\n", err)
progressChan <- fmt.Sprintf("failed to run collector %v", collector)
continue
}
podsCreated = append(podsCreated, pod)
}
start := time.Now()
for {
if start.Add(time.Second * 30).Before(time.Now()) {
fmt.Println("timeout running troubleshoot")
return "", err
collectorDir, err := parseAndSaveCollectorOutput(string(result), bundlePath)
if err != nil {
progressChan <- fmt.Sprintf("failed to parse collector spec: %v", collector)
continue
}
if len(podsDeleted) == len(podsCreated) {
break
if collectorDir == "" {
continue
}
time.Sleep(time.Millisecond * 200)
progressChan <- collectorDir
collectorDirs = append(collectorDirs, collectorDir)
}
ctx.Done()
tarGz := archiver.TarGz{
Tar: &archiver.Tar{
ImplicitTopLevelFolder: false,

View File

@@ -1,6 +1,9 @@
package main
import "github.com/replicatedhq/troubleshoot/cmd/troubleshoot/cli"
import (
"github.com/replicatedhq/troubleshoot/cmd/troubleshoot/cli"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
func main() {
cli.InitAndExecute()

View File

@@ -2,7 +2,6 @@ package collect
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/version"
@@ -20,15 +19,15 @@ type ClusterInfoOutput struct {
Errors []byte `json:"cluster-info/errors.json,omitempty"`
}
func ClusterInfo() error {
func ClusterInfo() ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return errors.Wrap(err, "failed to get kubernetes config")
return nil, errors.Wrap(err, "failed to get kubernetes config")
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return errors.Wrap(err, "Failed to create kuberenetes clientset")
return nil, errors.Wrap(err, "Failed to create kubernetes clientset")
}
clusterInfoOutput := ClusterInfoOutput{}
@@ -38,17 +37,15 @@ func ClusterInfo() error {
clusterInfoOutput.ClusterVersion = clusterVersion
clusterInfoOutput.Errors, err = marshalNonNil(clusterErrors)
if err != nil {
return errors.Wrap(err, "failed to marshal errors")
return nil, errors.Wrap(err, "failed to marshal errors")
}
b, err := json.MarshalIndent(clusterInfoOutput, "", " ")
if err != nil {
return errors.Wrap(err, "failed to marshal cluster info")
return nil, errors.Wrap(err, "failed to marshal cluster info")
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func clusterVersion(client *kubernetes.Clientset) ([]byte, []string) {

View File

@@ -33,15 +33,15 @@ type ClusterResourcesOutput struct {
ImagePullSecretsErrors []byte `json:"cluster-resources/image-pull-secrets-errors.json,omitempty"`
}
func ClusterResources(redact bool) error {
func ClusterResources(redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
clusterResourcesOutput := &ClusterResourcesOutput{}
@@ -51,7 +51,7 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.Namespaces = namespaces
clusterResourcesOutput.NamespacesErrors, err = marshalNonNil(nsErrors)
if err != nil {
return err
return nil, err
}
namespaceNames := make([]string, 0, 0)
@@ -63,7 +63,7 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.Pods = pods
clusterResourcesOutput.PodsErrors, err = marshalNonNil(podErrors)
if err != nil {
return err
return nil, err
}
// services
@@ -71,7 +71,7 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.Services = services
clusterResourcesOutput.ServicesErrors, err = marshalNonNil(servicesErrors)
if err != nil {
return err
return nil, err
}
// deployments
@@ -79,7 +79,7 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.Deployments = deployments
clusterResourcesOutput.DeploymentsErrors, err = marshalNonNil(deploymentsErrors)
if err != nil {
return err
return nil, err
}
// ingress
@@ -87,7 +87,7 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.Ingress = ingress
clusterResourcesOutput.IngressErrors, err = marshalNonNil(ingressErrors)
if err != nil {
return err
return nil, err
}
// storage classes
@@ -95,19 +95,19 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.StorageClasses = storageClasses
clusterResourcesOutput.StorageErrors, err = marshalNonNil(storageErrors)
if err != nil {
return err
return nil, err
}
// crds
crdClient, err := apiextensionsv1beta1clientset.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
customResourceDefinitions, crdErrors := crds(crdClient)
clusterResourcesOutput.CustomResourceDefinitions = customResourceDefinitions
clusterResourcesOutput.CustomResourceDefinitionsErrors, err = marshalNonNil(crdErrors)
if err != nil {
return err
return nil, err
}
// imagepullsecrets
@@ -115,24 +115,22 @@ func ClusterResources(redact bool) error {
clusterResourcesOutput.ImagePullSecrets = imagePullSecrets
clusterResourcesOutput.ImagePullSecretsErrors, err = marshalNonNil(pullSecretsErrors)
if err != nil {
return err
return nil, err
}
if redact {
clusterResourcesOutput, err = clusterResourcesOutput.Redact()
if err != nil {
return err
return nil, err
}
}
b, err := json.MarshalIndent(clusterResourcesOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func namespaces(client *kubernetes.Clientset) ([]byte, *corev1.NamespaceList, []string) {

View File

@@ -8,45 +8,40 @@ import (
)
type Collector struct {
Spec string
Redact bool
Collect *troubleshootv1beta1.Collect
Redact bool
}
func (c *Collector) RunCollectorSync() error {
collect, err := parseSpec(c.Spec)
if err != nil {
return err
}
if collect.ClusterInfo != nil {
func (c *Collector) RunCollectorSync() ([]byte, error) {
if c.Collect.ClusterInfo != nil {
return ClusterInfo()
}
if collect.ClusterResources != nil {
if c.Collect.ClusterResources != nil {
return ClusterResources(c.Redact)
}
if collect.Secret != nil {
return Secret(collect.Secret, c.Redact)
if c.Collect.Secret != nil {
return Secret(c.Collect.Secret, c.Redact)
}
if collect.Logs != nil {
return Logs(collect.Logs, c.Redact)
if c.Collect.Logs != nil {
return Logs(c.Collect.Logs, c.Redact)
}
if collect.Run != nil {
return Run(collect.Run, c.Redact)
if c.Collect.Run != nil {
return Run(c.Collect.Run, c.Redact)
}
if collect.Exec != nil {
return Exec(collect.Exec, c.Redact)
if c.Collect.Exec != nil {
return Exec(c.Collect.Exec, c.Redact)
}
if collect.Copy != nil {
return Copy(collect.Copy, c.Redact)
if c.Collect.Copy != nil {
return Copy(c.Collect.Copy, c.Redact)
}
if collect.HTTP != nil {
return HTTP(collect.HTTP, c.Redact)
if c.Collect.HTTP != nil {
return HTTP(c.Collect.HTTP, c.Redact)
}
return errors.New("no spec found to run")
return nil, errors.New("no spec found to run")
}
func parseSpec(specContents string) (*troubleshootv1beta1.Collect, error) {
func ParseSpec(specContents string) (*troubleshootv1beta1.Collect, error) {
collect := troubleshootv1beta1.Collect{}
if err := yaml.Unmarshal([]byte(specContents), &collect); err != nil {

View File

@@ -26,7 +26,7 @@ func Test_ParseSpec(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := parseSpec(test.spec)
c, err := ParseSpec(test.spec)
if test.expectError {
assert.Error(t, err)

View File

@@ -18,15 +18,15 @@ type CopyOutput struct {
Errors map[string][]byte `json:"copy-errors/,omitempty"`
}
func Copy(copyCollector *troubleshootv1beta1.Copy, redact bool) error {
func Copy(copyCollector *troubleshootv1beta1.Copy, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
copyOutput := &CopyOutput{
@@ -38,7 +38,7 @@ func Copy(copyCollector *troubleshootv1beta1.Copy, redact bool) error {
if len(podsErrors) > 0 {
errorBytes, err := marshalNonNil(podsErrors)
if err != nil {
return err
return nil, err
}
copyOutput.Errors[getCopyErrosFileName(copyCollector)] = errorBytes
}
@@ -50,7 +50,7 @@ func Copy(copyCollector *troubleshootv1beta1.Copy, redact bool) error {
key := fmt.Sprintf("%s/%s/%s-errors.json", pod.Namespace, pod.Name, copyCollector.ContainerPath)
copyOutput.Errors[key], err = marshalNonNil(copyErrors)
if err != nil {
return err
return nil, err
}
continue
}
@@ -67,12 +67,10 @@ func Copy(copyCollector *troubleshootv1beta1.Copy, redact bool) error {
b, err := json.MarshalIndent(copyOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func copyFiles(client *kubernetes.Clientset, pod corev1.Pod, copyCollector *troubleshootv1beta1.Copy) (map[string][]byte, map[string]string) {

View File

@@ -20,38 +20,47 @@ type ExecOutput struct {
Errors map[string][]byte `json:"exec-errors/,omitempty"`
}
func Exec(execCollector *troubleshootv1beta1.Exec, redact bool) error {
func Exec(execCollector *troubleshootv1beta1.Exec, redact bool) ([]byte, error) {
if execCollector.Timeout == "" {
return execWithoutTimeout(execCollector, redact)
}
timeout, err := time.ParseDuration(execCollector.Timeout)
if err != nil {
return err
return nil, err
}
execChan := make(chan error, 1)
errCh := make(chan error, 1)
resultCh := make(chan []byte, 1)
go func() {
execChan <- execWithoutTimeout(execCollector, redact)
b, err := execWithoutTimeout(execCollector, redact)
if err != nil {
errCh <- err
} else {
resultCh <- b
}
}()
select {
case <-time.After(timeout):
return errors.New("timeout")
case err := <-execChan:
return err
return nil, errors.New("timeout")
case result := <-resultCh:
return result, nil
case err := <-errCh:
return nil, err
}
}
func execWithoutTimeout(execCollector *troubleshootv1beta1.Exec, redact bool) error {
func execWithoutTimeout(execCollector *troubleshootv1beta1.Exec, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
execOutput := &ExecOutput{
@@ -63,7 +72,7 @@ func execWithoutTimeout(execCollector *troubleshootv1beta1.Exec, redact bool) er
if len(podsErrors) > 0 {
errorBytes, err := marshalNonNil(podsErrors)
if err != nil {
return err
return nil, err
}
execOutput.Errors[getExecErrosFileName(execCollector)] = errorBytes
}
@@ -76,7 +85,7 @@ func execWithoutTimeout(execCollector *troubleshootv1beta1.Exec, redact bool) er
if len(execErrors) > 0 {
errorBytes, err := marshalNonNil(execErrors)
if err != nil {
return err
return nil, err
}
execOutput.Results[fmt.Sprintf("%s/%s/%s-errors.json", pod.Namespace, pod.Name, execCollector.CollectorName)] = errorBytes
continue
@@ -86,19 +95,17 @@ func execWithoutTimeout(execCollector *troubleshootv1beta1.Exec, redact bool) er
if redact {
execOutput, err = execOutput.Redact()
if err != nil {
return err
return nil, err
}
}
}
b, err := json.MarshalIndent(execOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func getExecOutputs(client *kubernetes.Clientset, pod corev1.Pod, execCollector *troubleshootv1beta1.Exec, doRedact bool) ([]byte, []byte, []string) {

View File

@@ -4,7 +4,6 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
@@ -27,7 +26,7 @@ type httpError struct {
Message string `json:"message"`
}
func HTTP(httpCollector *troubleshootv1beta1.HTTP, redact bool) error {
func HTTP(httpCollector *troubleshootv1beta1.HTTP, redact bool) ([]byte, error) {
var response *http.Response
var err error
@@ -38,12 +37,12 @@ func HTTP(httpCollector *troubleshootv1beta1.HTTP, redact bool) error {
} else if httpCollector.Put != nil {
response, err = doPut(httpCollector.Put)
} else {
return errors.New("no supported http request type")
return nil, errors.New("no supported http request type")
}
output, err := responseToOutput(response, err, redact)
if err != nil {
return err
return nil, err
}
httpOutput := &HTTPOutput{
@@ -54,12 +53,10 @@ func HTTP(httpCollector *troubleshootv1beta1.HTTP, redact bool) error {
b, err := json.MarshalIndent(httpOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func doGet(get *troubleshootv1beta1.Get) (*http.Response, error) {

View File

@@ -21,15 +21,15 @@ type LogsOutput struct {
Errors map[string][]byte `json:"logs-errors/,omitempty"`
}
func Logs(logsCollector *troubleshootv1beta1.Logs, redact bool) error {
func Logs(logsCollector *troubleshootv1beta1.Logs, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
logsOutput := &LogsOutput{
@@ -41,7 +41,7 @@ func Logs(logsCollector *troubleshootv1beta1.Logs, redact bool) error {
if len(podsErrors) > 0 {
errorBytes, err := marshalNonNil(podsErrors)
if err != nil {
return err
return nil, err
}
logsOutput.Errors[getLogsErrorsFileName(logsCollector)] = errorBytes
}
@@ -53,7 +53,7 @@ func Logs(logsCollector *troubleshootv1beta1.Logs, redact bool) error {
key := fmt.Sprintf("%s/%s-errors.json", pod.Namespace, pod.Name)
logsOutput.Errors[key], err = marshalNonNil([]string{err.Error()})
if err != nil {
return err
return nil, err
}
continue
}
@@ -66,19 +66,17 @@ func Logs(logsCollector *troubleshootv1beta1.Logs, redact bool) error {
if redact {
logsOutput, err = logsOutput.Redact()
if err != nil {
return err
return nil, err
}
}
}
b, err := json.MarshalIndent(logsOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func listPodsInSelectors(client *kubernetes.Clientset, namespace string, selector []string) ([]corev1.Pod, []string) {

View File

@@ -3,7 +3,6 @@ package collect
import (
"encoding/json"
"errors"
"fmt"
"time"
troubleshootv1beta1 "github.com/replicatedhq/troubleshoot/pkg/apis/troubleshoot/v1beta1"
@@ -18,20 +17,20 @@ type RunOutput struct {
PodLogs map[string][]byte `json:"run/,omitempty"`
}
func Run(runCollector *troubleshootv1beta1.Run, redact bool) error {
func Run(runCollector *troubleshootv1beta1.Run, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
pod, err := runPod(client, runCollector)
if err != nil {
return err
return nil, err
}
defer func() {
@@ -46,37 +45,45 @@ func Run(runCollector *troubleshootv1beta1.Run, redact bool) error {
timeout, err := time.ParseDuration(runCollector.Timeout)
if err != nil {
return err
return nil, err
}
runChan := make(chan error, 1)
errCh := make(chan error, 1)
resultCh := make(chan []byte, 1)
go func() {
runChan <- runWithoutTimeout(pod, runCollector, redact)
b, err := runWithoutTimeout(pod, runCollector, redact)
if err != nil {
errCh <- err
} else {
resultCh <- b
}
}()
select {
case <-time.After(timeout):
return errors.New("timeout")
case err := <-runChan:
return err
return nil, errors.New("timeout")
case result := <-resultCh:
return result, nil
case err := <-errCh:
return nil, err
}
}
func runWithoutTimeout(pod *corev1.Pod, runCollector *troubleshootv1beta1.Run, redact bool) error {
func runWithoutTimeout(pod *corev1.Pod, runCollector *troubleshootv1beta1.Run, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
for {
status, err := client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
return err
return nil, err
}
if status.Status.Phase == "Running" {
break
@@ -100,18 +107,16 @@ func runWithoutTimeout(pod *corev1.Pod, runCollector *troubleshootv1beta1.Run, r
if redact {
runOutput, err = runOutput.Redact()
if err != nil {
return err
return nil, err
}
}
b, err := json.MarshalIndent(runOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func runPod(client *kubernetes.Clientset, runCollector *troubleshootv1beta1.Run) (*corev1.Pod, error) {

View File

@@ -23,15 +23,15 @@ type SecretOutput struct {
Errors map[string][]byte `json:"secrets-errors/,omitempty"`
}
func Secret(secretCollector *troubleshootv1beta1.Secret, redact bool) error {
func Secret(secretCollector *troubleshootv1beta1.Secret, redact bool) ([]byte, error) {
cfg, err := config.GetConfig()
if err != nil {
return err
return nil, err
}
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
return nil, err
}
secretOutput := &SecretOutput{
@@ -43,7 +43,7 @@ func Secret(secretCollector *troubleshootv1beta1.Secret, redact bool) error {
if err != nil {
errorBytes, err := marshalNonNil([]string{err.Error()})
if err != nil {
return err
return nil, err
}
secretOutput.Errors[fmt.Sprintf("%s/%s.json", secret.Namespace, secret.Name)] = errorBytes
}
@@ -52,19 +52,17 @@ func Secret(secretCollector *troubleshootv1beta1.Secret, redact bool) error {
if redact {
secretOutput, err = secretOutput.Redact()
if err != nil {
return err
return nil, err
}
}
}
b, err := json.MarshalIndent(secretOutput, "", " ")
if err != nil {
return err
return nil, err
}
fmt.Printf("%s\n", b)
return nil
return b, nil
}
func secret(client *kubernetes.Clientset, secretCollector *troubleshootv1beta1.Secret) (*FoundSecret, []byte, error) {