diff --git a/pkg/goldpinger/pinger.go b/pkg/goldpinger/pinger.go index 5ce613a..c743265 100644 --- a/pkg/goldpinger/pinger.go +++ b/pkg/goldpinger/pinger.go @@ -1,58 +1,67 @@ package goldpinger import ( - "log" + "context" "time" - apiclient "github.com/bloomberg/goldpinger/pkg/client" - "github.com/bloomberg/goldpinger/pkg/models" + "go.uber.org/zap" + + apiclient "github.com/bloomberg/goldpinger/v3/pkg/client" + "github.com/bloomberg/goldpinger/v3/pkg/client/operations" + "github.com/bloomberg/goldpinger/v3/pkg/models" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/util/wait" ) // Pinger contains all the info needed by a goroutine to continuously ping a pod type Pinger struct { - podIP string - hostIP string + pod *GoldpingerPod client *apiclient.Goldpinger - timer *prometheus.Timer + timeout time.Duration histogram prometheus.Observer result PingAllPodsResult resultsChan chan<- PingAllPodsResult stopChan chan struct{} + logger *zap.Logger } // NewPinger constructs and returns a Pinger object responsible for pinging a single // goldpinger pod -func NewPinger(podIP string, hostIP string, resultsChan chan<- PingAllPodsResult) *Pinger { +func NewPinger(pod *GoldpingerPod, resultsChan chan<- PingAllPodsResult) *Pinger { p := Pinger{ - podIP: podIP, - hostIP: hostIP, + pod: pod, + timeout: time.Duration(GoldpingerConfig.PingTimeoutMs) * time.Millisecond, resultsChan: resultsChan, stopChan: make(chan struct{}), histogram: goldpingerResponseTimePeersHistogram.WithLabelValues( GoldpingerConfig.Hostname, "ping", - hostIP, - podIP, + pod.HostIP, + pod.PodIP, + ), + + logger: zap.L().With( + zap.String("op", "pinger"), + zap.String("name", pod.Name), + zap.String("hostIP", pod.HostIP), + zap.String("podIP", pod.PodIP), ), } // Initialize the result - p.result.hostIPv4.UnmarshalText([]byte(hostIP)) - p.result.podIP = podIP + p.result.hostIPv4.UnmarshalText([]byte(pod.HostIP)) + p.result.podIPv4.UnmarshalText([]byte(pod.PodIP)) // Get a client for pinging the given pod // On error, create a static pod result that does nothing - client, err := getClient(pickPodHostIP(podIP, hostIP)) + client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP)) if err == nil { p.client = client } else { OK := false p.client = nil p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 500, ResponseTimeMs: 0} - p.result.podIP = hostIP } return &p } @@ -62,18 +71,36 @@ func (p *Pinger) Ping() { CountCall("made", "ping") start := time.Now() - resp, err := p.client.Operations.Ping(nil) + ctx, cancel := context.WithTimeout(context.Background(), p.timeout) + defer cancel() + + params := operations.NewPingParamsWithContext(ctx) + resp, err := p.client.Operations.Ping(params) responseTime := time.Since(start) responseTimeMs := responseTime.Nanoseconds() / int64(time.Millisecond) p.histogram.Observe(responseTime.Seconds()) OK := (err == nil) if OK { - p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Response: resp.Payload, StatusCode: 200, ResponseTimeMs: responseTimeMs} - log.Printf("Success pinging pod: %s, host: %s, resp: %+v, response time: %+v", p.podIP, p.hostIP, resp.Payload, responseTime) + p.result.podResult = models.PodResult{ + PodIP: p.result.podIPv4, + HostIP: p.result.hostIPv4, + OK: &OK, + Response: resp.Payload, + StatusCode: 200, + ResponseTimeMs: responseTimeMs, + } + p.logger.Debug("Success pinging pod", zap.Duration("responseTime", responseTime)) } else { - p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 504, ResponseTimeMs: responseTimeMs} - log.Printf("Error pinging pod: %s, host: %s, err: %+v, response time: %+v", p.podIP, p.hostIP, err, responseTime) + p.result.podResult = models.PodResult{ + PodIP: p.result.podIPv4, + HostIP: p.result.hostIPv4, + OK: &OK, + Error: err.Error(), + StatusCode: 504, + ResponseTimeMs: responseTimeMs, + } + p.logger.Warn("Ping returned error", zap.Duration("responseTime", responseTime), zap.Error(err)) CountError("ping") } p.resultsChan <- p.result diff --git a/pkg/goldpinger/updater.go b/pkg/goldpinger/updater.go index 676a953..b12567b 100644 --- a/pkg/goldpinger/updater.go +++ b/pkg/goldpinger/updater.go @@ -15,39 +15,54 @@ package goldpinger import ( - "context" - "fmt" "time" - "sync" "go.uber.org/zap" - "github.com/bloomberg/goldpinger/pkg/models" - "github.com/go-openapi/strfmt" + "github.com/bloomberg/goldpinger/v3/pkg/models" ) -// resultsMux controls access to the results from multiple goroutines -var resultsMux sync.Mutex +// checkResults holds the latest results of checking the pods +var checkResults models.CheckResults + +// counterHealthy is the number of healthy pods +var counterHealthy float64 // getPingers creates a new set of pingers for the given pods // Each pinger is responsible for pinging a single pod and returns // the results on the results channel -func getPingers(pods map[string]string, resultsChan chan<- PingAllPodsResult) map[string]*Pinger { +func getPingers(pods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult) map[string]*Pinger { pingers := map[string]*Pinger{} - for podIP, hostIP := range pods { - pingers[podIP] = NewPinger(podIP, hostIP, resultsChan) + for podName, pod := range pods { + pingers[podName] = NewPinger(pod, resultsChan) } return pingers } +// initCheckResults initializes the check results, which will be updated continuously +// as the results come in +func initCheckResults(pingers map[string]*Pinger) { + checkResults = models.CheckResults{} + checkResults.PodResults = make(map[string]models.PodResult) + for podName, pinger := range pingers { + checkResults.PodResults[podName] = pinger.result.podResult + } + counterHealthy = 0 +} + // startPingers starts `n` goroutines to continuously ping all the given pods, one goroutine per pod // It staggers the start of all the go-routines to prevent a thundering herd func startPingers(pingers map[string]*Pinger) { refreshPeriod := time.Duration(GoldpingerConfig.RefreshInterval) * time.Second waitBetweenPods := refreshPeriod / time.Duration(len(pingers)) - log.Printf("Refresh Period: %+v Wait Period: %+v Jitter Factor: %+v", refreshPeriod, waitBetweenPods, GoldpingerConfig.JitterFactor) + zap.L().Info( + "Starting Pingers", + zap.Duration("refreshPeriod", refreshPeriod), + zap.Duration("waitPeriod", waitBetweenPods), + zap.Float64("JitterFactor", GoldpingerConfig.JitterFactor), + ) for _, p := range pingers { go p.PingContinuously(refreshPeriod, GoldpingerConfig.JitterFactor) @@ -55,51 +70,38 @@ func startPingers(pingers map[string]*Pinger) { } } -// collectResults simply reads results from the results channel and saves them in a map -func collectResults(resultsChan <-chan PingAllPodsResult) *models.CheckResults { - results := models.CheckResults{} - results.PodResults = make(map[string]models.PodResult) - go func() { - for response := range resultsChan { - var podIPv4 strfmt.IPv4 - podIPv4.UnmarshalText([]byte(response.podIP)) +// updateCounters updates the value of health and unhealthy nodes as the results come in +func updateCounters(podName string, result *models.PodResult) { + // Get the previous value of ok + old := checkResults.PodResults[podName] + oldOk := (old.OK != nil && *old.OK) - // results.PodResults map will be read by processResults() - // to count the number of healthy and unhealthy nodes - // Since concurrent access to maps isn't safe from multiple - // goroutines, lock the mutex before update - resultsMux.Lock() - results.PodResults[response.podIP] = response.podResult - resultsMux.Unlock() - } - }() - return &results + // Check if the value of ok has changed + // If not, do nothing + if oldOk == *result.OK { + return + } + + if *result.OK { + // The value was previously false and just became true + // Increment the counter + counterHealthy++ + } else { + // The value was previously true and just became false + counterHealthy-- + } + CountHealthyUnhealthyNodes(counterHealthy, float64(len(checkResults.PodResults))-counterHealthy) } -// processResults goes through all the entries in the results channel and counts -// the number of health and unhealth nodes. It just reports the correct number -func processResults(results *models.CheckResults) { - for { - var troublemakers []string - var counterHealthy, counterUnhealthy float64 - - resultsMux.Lock() - for podIP, value := range results.PodResults { - if *value.OK != true { - counterUnhealthy++ - troublemakers = append(troublemakers, fmt.Sprintf("%s (%s)", podIP, value.HostIP.String())) - } else { - counterHealthy++ - } +// collectResults simply reads results from the results channel and saves them in a map +func collectResults(resultsChan <-chan PingAllPodsResult) { + go func() { + for response := range resultsChan { + result := response.podResult + updateCounters(response.podName, &result) + checkResults.PodResults[response.podName] = result } - resultsMux.Unlock() - - CountHealthyUnhealthyNodes(counterHealthy, counterUnhealthy) - if len(troublemakers) > 0 { - log.Println("Updater ran into trouble with these peers: ", troublemakers) - } - time.Sleep(time.Duration(GoldpingerConfig.RefreshInterval) * time.Second) - } + }() } func StartUpdater() { @@ -108,14 +110,13 @@ func StartUpdater() { return } - pods := GoldpingerConfig.PodSelecter.SelectPods() + pods := SelectPods() zap.S().Infof("Got Pods: %+v", pods) // Create a channel for the results resultsChan := make(chan PingAllPodsResult, len(pods)) pingers := getPingers(pods, resultsChan) - + initCheckResults(pingers) startPingers(pingers) - results := collectResults(resultsChan) - go processResults(results) + collectResults(resultsChan) }