Get rid of the lock and keep a running count of healthy/unhealthy nodes

Signed-off-by: Sachin Kamboj <skamboj1@bloomberg.net>
This commit is contained in:
Sachin Kamboj
2020-04-03 08:56:30 -04:00
parent 8a40aee927
commit 40f57b1a4e
2 changed files with 104 additions and 76 deletions

View File

@@ -1,58 +1,67 @@
package goldpinger
import (
"log"
"context"
"time"
apiclient "github.com/bloomberg/goldpinger/pkg/client"
"github.com/bloomberg/goldpinger/pkg/models"
"go.uber.org/zap"
apiclient "github.com/bloomberg/goldpinger/v3/pkg/client"
"github.com/bloomberg/goldpinger/v3/pkg/client/operations"
"github.com/bloomberg/goldpinger/v3/pkg/models"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
)
// Pinger contains all the info needed by a goroutine to continuously ping a pod
type Pinger struct {
podIP string
hostIP string
pod *GoldpingerPod
client *apiclient.Goldpinger
timer *prometheus.Timer
timeout time.Duration
histogram prometheus.Observer
result PingAllPodsResult
resultsChan chan<- PingAllPodsResult
stopChan chan struct{}
logger *zap.Logger
}
// NewPinger constructs and returns a Pinger object responsible for pinging a single
// goldpinger pod
func NewPinger(podIP string, hostIP string, resultsChan chan<- PingAllPodsResult) *Pinger {
func NewPinger(pod *GoldpingerPod, resultsChan chan<- PingAllPodsResult) *Pinger {
p := Pinger{
podIP: podIP,
hostIP: hostIP,
pod: pod,
timeout: time.Duration(GoldpingerConfig.PingTimeoutMs) * time.Millisecond,
resultsChan: resultsChan,
stopChan: make(chan struct{}),
histogram: goldpingerResponseTimePeersHistogram.WithLabelValues(
GoldpingerConfig.Hostname,
"ping",
hostIP,
podIP,
pod.HostIP,
pod.PodIP,
),
logger: zap.L().With(
zap.String("op", "pinger"),
zap.String("name", pod.Name),
zap.String("hostIP", pod.HostIP),
zap.String("podIP", pod.PodIP),
),
}
// Initialize the result
p.result.hostIPv4.UnmarshalText([]byte(hostIP))
p.result.podIP = podIP
p.result.hostIPv4.UnmarshalText([]byte(pod.HostIP))
p.result.podIPv4.UnmarshalText([]byte(pod.PodIP))
// Get a client for pinging the given pod
// On error, create a static pod result that does nothing
client, err := getClient(pickPodHostIP(podIP, hostIP))
client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP))
if err == nil {
p.client = client
} else {
OK := false
p.client = nil
p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 500, ResponseTimeMs: 0}
p.result.podIP = hostIP
}
return &p
}
@@ -62,18 +71,36 @@ func (p *Pinger) Ping() {
CountCall("made", "ping")
start := time.Now()
resp, err := p.client.Operations.Ping(nil)
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
params := operations.NewPingParamsWithContext(ctx)
resp, err := p.client.Operations.Ping(params)
responseTime := time.Since(start)
responseTimeMs := responseTime.Nanoseconds() / int64(time.Millisecond)
p.histogram.Observe(responseTime.Seconds())
OK := (err == nil)
if OK {
p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Response: resp.Payload, StatusCode: 200, ResponseTimeMs: responseTimeMs}
log.Printf("Success pinging pod: %s, host: %s, resp: %+v, response time: %+v", p.podIP, p.hostIP, resp.Payload, responseTime)
p.result.podResult = models.PodResult{
PodIP: p.result.podIPv4,
HostIP: p.result.hostIPv4,
OK: &OK,
Response: resp.Payload,
StatusCode: 200,
ResponseTimeMs: responseTimeMs,
}
p.logger.Debug("Success pinging pod", zap.Duration("responseTime", responseTime))
} else {
p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 504, ResponseTimeMs: responseTimeMs}
log.Printf("Error pinging pod: %s, host: %s, err: %+v, response time: %+v", p.podIP, p.hostIP, err, responseTime)
p.result.podResult = models.PodResult{
PodIP: p.result.podIPv4,
HostIP: p.result.hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 504,
ResponseTimeMs: responseTimeMs,
}
p.logger.Warn("Ping returned error", zap.Duration("responseTime", responseTime), zap.Error(err))
CountError("ping")
}
p.resultsChan <- p.result

View File

@@ -15,39 +15,54 @@
package goldpinger
import (
"context"
"fmt"
"time"
"sync"
"go.uber.org/zap"
"github.com/bloomberg/goldpinger/pkg/models"
"github.com/go-openapi/strfmt"
"github.com/bloomberg/goldpinger/v3/pkg/models"
)
// resultsMux controls access to the results from multiple goroutines
var resultsMux sync.Mutex
// checkResults holds the latest results of checking the pods
var checkResults models.CheckResults
// counterHealthy is the number of healthy pods
var counterHealthy float64
// getPingers creates a new set of pingers for the given pods
// Each pinger is responsible for pinging a single pod and returns
// the results on the results channel
func getPingers(pods map[string]string, resultsChan chan<- PingAllPodsResult) map[string]*Pinger {
func getPingers(pods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult) map[string]*Pinger {
pingers := map[string]*Pinger{}
for podIP, hostIP := range pods {
pingers[podIP] = NewPinger(podIP, hostIP, resultsChan)
for podName, pod := range pods {
pingers[podName] = NewPinger(pod, resultsChan)
}
return pingers
}
// initCheckResults initializes the check results, which will be updated continuously
// as the results come in
func initCheckResults(pingers map[string]*Pinger) {
checkResults = models.CheckResults{}
checkResults.PodResults = make(map[string]models.PodResult)
for podName, pinger := range pingers {
checkResults.PodResults[podName] = pinger.result.podResult
}
counterHealthy = 0
}
// startPingers starts `n` goroutines to continuously ping all the given pods, one goroutine per pod
// It staggers the start of all the go-routines to prevent a thundering herd
func startPingers(pingers map[string]*Pinger) {
refreshPeriod := time.Duration(GoldpingerConfig.RefreshInterval) * time.Second
waitBetweenPods := refreshPeriod / time.Duration(len(pingers))
log.Printf("Refresh Period: %+v Wait Period: %+v Jitter Factor: %+v", refreshPeriod, waitBetweenPods, GoldpingerConfig.JitterFactor)
zap.L().Info(
"Starting Pingers",
zap.Duration("refreshPeriod", refreshPeriod),
zap.Duration("waitPeriod", waitBetweenPods),
zap.Float64("JitterFactor", GoldpingerConfig.JitterFactor),
)
for _, p := range pingers {
go p.PingContinuously(refreshPeriod, GoldpingerConfig.JitterFactor)
@@ -55,51 +70,38 @@ func startPingers(pingers map[string]*Pinger) {
}
}
// collectResults simply reads results from the results channel and saves them in a map
func collectResults(resultsChan <-chan PingAllPodsResult) *models.CheckResults {
results := models.CheckResults{}
results.PodResults = make(map[string]models.PodResult)
go func() {
for response := range resultsChan {
var podIPv4 strfmt.IPv4
podIPv4.UnmarshalText([]byte(response.podIP))
// updateCounters updates the value of health and unhealthy nodes as the results come in
func updateCounters(podName string, result *models.PodResult) {
// Get the previous value of ok
old := checkResults.PodResults[podName]
oldOk := (old.OK != nil && *old.OK)
// results.PodResults map will be read by processResults()
// to count the number of healthy and unhealthy nodes
// Since concurrent access to maps isn't safe from multiple
// goroutines, lock the mutex before update
resultsMux.Lock()
results.PodResults[response.podIP] = response.podResult
resultsMux.Unlock()
}
}()
return &results
// Check if the value of ok has changed
// If not, do nothing
if oldOk == *result.OK {
return
}
if *result.OK {
// The value was previously false and just became true
// Increment the counter
counterHealthy++
} else {
// The value was previously true and just became false
counterHealthy--
}
CountHealthyUnhealthyNodes(counterHealthy, float64(len(checkResults.PodResults))-counterHealthy)
}
// processResults goes through all the entries in the results channel and counts
// the number of health and unhealth nodes. It just reports the correct number
func processResults(results *models.CheckResults) {
for {
var troublemakers []string
var counterHealthy, counterUnhealthy float64
resultsMux.Lock()
for podIP, value := range results.PodResults {
if *value.OK != true {
counterUnhealthy++
troublemakers = append(troublemakers, fmt.Sprintf("%s (%s)", podIP, value.HostIP.String()))
} else {
counterHealthy++
}
// collectResults simply reads results from the results channel and saves them in a map
func collectResults(resultsChan <-chan PingAllPodsResult) {
go func() {
for response := range resultsChan {
result := response.podResult
updateCounters(response.podName, &result)
checkResults.PodResults[response.podName] = result
}
resultsMux.Unlock()
CountHealthyUnhealthyNodes(counterHealthy, counterUnhealthy)
if len(troublemakers) > 0 {
log.Println("Updater ran into trouble with these peers: ", troublemakers)
}
time.Sleep(time.Duration(GoldpingerConfig.RefreshInterval) * time.Second)
}
}()
}
func StartUpdater() {
@@ -108,14 +110,13 @@ func StartUpdater() {
return
}
pods := GoldpingerConfig.PodSelecter.SelectPods()
pods := SelectPods()
zap.S().Infof("Got Pods: %+v", pods)
// Create a channel for the results
resultsChan := make(chan PingAllPodsResult, len(pods))
pingers := getPingers(pods, resultsChan)
initCheckResults(pingers)
startPingers(pingers)
results := collectResults(resultsChan)
go processResults(results)
collectResults(resultsChan)
}