Update the set of pingers at regular intervals from the k8s API server

Signed-off-by: Sachin Kamboj <skamboj1@bloomberg.net>
This commit is contained in:
Sachin Kamboj
2020-04-07 19:54:19 -04:00
parent bc313a7fbb
commit 9db241d67d
3 changed files with 243 additions and 96 deletions

View File

@@ -45,8 +45,7 @@ func CheckNeighboursNeighbours(ctx context.Context) *models.CheckAllResults {
type PingAllPodsResult struct {
podName string
podResult models.PodResult
hostIPv4 strfmt.IPv4
podIPv4 strfmt.IPv4
deleted bool
}
func pickPodHostIP(podIP, hostIP string) string {
@@ -99,20 +98,22 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo
start := time.Now()
// setup
var channelResult PingAllPodsResult
channelResult.podName = pod.Name
channelResult.hostIPv4.UnmarshalText([]byte(pod.HostIP))
channelResult.podIPv4.UnmarshalText([]byte(pod.PodIP))
channelResult := PingAllPodsResult{podName: pod.Name}
OK := false
var responseTime int64
client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP))
var responseTime int64
var hostIPv4 strfmt.IPv4
var podIPv4 strfmt.IPv4
hostIPv4.UnmarshalText([]byte(pod.HostIP))
podIPv4.UnmarshalText([]byte(pod.PodIP))
client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP))
if err != nil {
logger.Warn("Couldn't get a client for Ping", zap.Error(err))
channelResult.podResult = models.PodResult{
PodIP: channelResult.podIPv4,
HostIP: channelResult.hostIPv4,
PodIP: podIPv4,
HostIP: hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 500,
@@ -133,8 +134,8 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo
if OK {
logger.Debug("Pink Ok", zap.Int64("responseTime", responseTime))
channelResult.podResult = models.PodResult{
PodIP: channelResult.podIPv4,
HostIP: channelResult.hostIPv4,
PodIP: podIPv4,
HostIP: hostIPv4,
OK: &OK,
Response: resp.Payload,
StatusCode: 200,
@@ -144,8 +145,8 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo
} else {
logger.Warn("Ping returned error", zap.Int64("responseTime", responseTime), zap.Error(err))
channelResult.podResult = models.PodResult{
PodIP: channelResult.podIPv4,
HostIP: channelResult.hostIPv4,
PodIP: podIPv4,
HostIP: hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 504,

View File

@@ -1,3 +1,17 @@
// Copyright 2018 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goldpinger
import (
@@ -9,6 +23,7 @@ import (
apiclient "github.com/bloomberg/goldpinger/v3/pkg/client"
"github.com/bloomberg/goldpinger/v3/pkg/client/operations"
"github.com/bloomberg/goldpinger/v3/pkg/models"
"github.com/go-openapi/strfmt"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
)
@@ -19,7 +34,8 @@ type Pinger struct {
client *apiclient.Goldpinger
timeout time.Duration
histogram prometheus.Observer
result PingAllPodsResult
hostIPv4 strfmt.IPv4
podIPv4 strfmt.IPv4
resultsChan chan<- PingAllPodsResult
stopChan chan struct{}
logger *zap.Logger
@@ -49,25 +65,48 @@ func NewPinger(pod *GoldpingerPod, resultsChan chan<- PingAllPodsResult) *Pinger
),
}
// Initialize the result
p.result.hostIPv4.UnmarshalText([]byte(pod.HostIP))
p.result.podIPv4.UnmarshalText([]byte(pod.PodIP))
// Initialize the host/pod IPv4
p.hostIPv4.UnmarshalText([]byte(pod.HostIP))
p.podIPv4.UnmarshalText([]byte(pod.PodIP))
// Get a client for pinging the given pod
// On error, create a static pod result that does nothing
client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP))
if err == nil {
p.client = client
} else {
OK := false
p.client = nil
p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 500, ResponseTimeMs: 0}
}
return &p
}
// getClient returns a client that can be used to ping the given pod
// On error, it returns a static result
func (p *Pinger) getClient() (*apiclient.Goldpinger, error) {
if p.client != nil {
return p.client, nil
}
client, err := getClient(pickPodHostIP(p.pod.PodIP, p.pod.HostIP))
if err != nil {
p.logger.Warn("Could not get client", zap.Error(err))
OK := false
p.resultsChan <- PingAllPodsResult{
podName: p.pod.Name,
podResult: models.PodResult{
PodIP: p.podIPv4,
HostIP: p.hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 500,
ResponseTimeMs: 0,
},
}
return nil, err
}
p.client = client
return p.client, nil
}
// Ping makes a single ping request to the given pod
func (p *Pinger) Ping() {
client, err := p.getClient()
if err != nil {
return
}
CountCall("made", "ping")
start := time.Now()
@@ -75,39 +114,60 @@ func (p *Pinger) Ping() {
defer cancel()
params := operations.NewPingParamsWithContext(ctx)
resp, err := p.client.Operations.Ping(params)
resp, err := client.Operations.Ping(params)
responseTime := time.Since(start)
responseTimeMs := responseTime.Nanoseconds() / int64(time.Millisecond)
p.histogram.Observe(responseTime.Seconds())
OK := (err == nil)
if OK {
p.result.podResult = models.PodResult{
PodIP: p.result.podIPv4,
HostIP: p.result.hostIPv4,
OK: &OK,
Response: resp.Payload,
StatusCode: 200,
ResponseTimeMs: responseTimeMs,
p.resultsChan <- PingAllPodsResult{
podName: p.pod.Name,
podResult: models.PodResult{
PodIP: p.podIPv4,
HostIP: p.hostIPv4,
OK: &OK,
Response: resp.Payload,
StatusCode: 200,
ResponseTimeMs: responseTimeMs,
},
}
p.logger.Debug("Success pinging pod", zap.Duration("responseTime", responseTime))
} else {
p.result.podResult = models.PodResult{
PodIP: p.result.podIPv4,
HostIP: p.result.hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 504,
ResponseTimeMs: responseTimeMs,
p.resultsChan <- PingAllPodsResult{
podName: p.pod.Name,
podResult: models.PodResult{
PodIP: p.podIPv4,
HostIP: p.hostIPv4,
OK: &OK,
Error: err.Error(),
StatusCode: 504,
ResponseTimeMs: responseTimeMs,
},
}
p.logger.Warn("Ping returned error", zap.Duration("responseTime", responseTime), zap.Error(err))
CountError("ping")
}
p.resultsChan <- p.result
}
// PingContinuously continuously pings the given pod with a delay between
// `period` and `period + jitterFactor * period`
func (p *Pinger) PingContinuously(period time.Duration, jitterFactor float64) {
wait.JitterUntil(p.Ping, period, jitterFactor, false, p.stopChan)
func (p *Pinger) PingContinuously(initialWait time.Duration, period time.Duration, jitterFactor float64) {
p.logger.Info(
"Starting pinger",
zap.Duration("period", period),
zap.Duration("initialWait", initialWait),
zap.Float64("jitterFactor", jitterFactor),
)
timer := time.NewTimer(initialWait)
select {
case <-timer.C:
wait.JitterUntil(p.Ping, period, jitterFactor, false, p.stopChan)
case <-p.stopChan:
// Do nothing
}
// We are done, send a message on the results channel to delete this
p.resultsChan <- PingAllPodsResult{podName: p.pod.Name, deleted: true}
}

View File

@@ -15,6 +15,7 @@
package goldpinger
import (
"sync"
"time"
"go.uber.org/zap"
@@ -23,71 +24,156 @@ import (
)
// checkResults holds the latest results of checking the pods
var checkResults models.CheckResults
var checkResults = models.CheckResults{PodResults: make(map[string]models.PodResult)}
// counterHealthy is the number of healthy pods
var counterHealthy float64
var counterHealthy = float64(0.0)
// getPingers creates a new set of pingers for the given pods
// Each pinger is responsible for pinging a single pod and returns
// the results on the results channel
func getPingers(pods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult) map[string]*Pinger {
pingers := map[string]*Pinger{}
// checkResultsMux controls concurrent access to checkResults
var checkResultsMux = sync.Mutex{}
for podName, pod := range pods {
pingers[podName] = NewPinger(pod, resultsChan)
}
return pingers
// exists checks whether there is an existing pinger for the given pod
// returns true if:
// - there is already a pinger with the same name
// - the pinger has the same podIP
// - the pinger has the same hostIP
func exists(existingPods map[string]*GoldpingerPod, new *GoldpingerPod) bool {
old, exists := existingPods[new.Name]
return exists && (old.PodIP == new.PodIP) && (old.HostIP == new.HostIP)
}
// initCheckResults initializes the check results, which will be updated continuously
// as the results come in
func initCheckResults(pingers map[string]*Pinger) {
checkResults = models.CheckResults{}
checkResults.PodResults = make(map[string]models.PodResult)
for podName, pinger := range pingers {
checkResults.PodResults[podName] = pinger.result.podResult
}
counterHealthy = 0
}
// startPingers starts `n` goroutines to continuously ping all the given pods, one goroutine per pod
// It staggers the start of all the go-routines to prevent a thundering herd
func startPingers(pingers map[string]*Pinger) {
// updatePingers calls SelectPods() at regular intervals to get a new list of goldpinger pods to ping
// For each goldpinger pod, it then creates a pinger responsible for pinging it and returning the
// results on the result channel
func updatePingers(resultsChan chan<- PingAllPodsResult) {
// Important: This is the only goroutine that should have access to
// these maps since there is nothing controlling concurrent access
pingers := make(map[string]*Pinger)
existingPods := make(map[string]*GoldpingerPod)
refreshPeriod := time.Duration(GoldpingerConfig.RefreshInterval) * time.Second
waitBetweenPods := refreshPeriod / time.Duration(len(pingers))
for {
// Initialize deletedPods to all existing pods, we will remove
// any pods that should still exist from this list after we are done
// NOTE: This is *NOT* a copy of existingPods just a new variable name
// to make the intention/code clear and cleaner
deletedPods := existingPods
// New pods are brand new and haven't been seen before
newPods := make(map[string]*GoldpingerPod)
latest := SelectPods()
for podName, pod := range latest {
if exists(existingPods, pod) {
// This pod continues to exist in the latest iteration of the update
// without any changes
// Delete it from the set of pods that we wish to delete
delete(deletedPods, podName)
} else {
// This pod is brand new and has never been seen before
// Add it to the list of newPods
newPods[podName] = pod
}
}
// deletedPods now contains any pods that have either been deleted from the api-server
// *OR* weren't selected by our rendezvous hash
// *OR* had their host/pod IP changed. Remove those pingers
destroyPingers(pingers, deletedPods)
// Next create pingers for new pods
createPingers(pingers, newPods, resultsChan, refreshPeriod)
// Finally, just set existingPods to the latest and collect garbage
existingPods = latest
deletedPods = nil
newPods = nil
// Wait the given time before pinging
time.Sleep(refreshPeriod)
}
}
// createPingers allocates a new pinger object for each new goldpinger Pod that's been discovered
// It also:
// (a) initializes a result object in checkResults to store info on that pod
// (b) starts a new goroutines to continuously ping the given pod.
// Each new goroutine waits for a given time before starting the continuous ping
// to prevent a thundering herd
func createPingers(pingers map[string]*Pinger, newPods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult, refreshPeriod time.Duration) {
if len(newPods) == 0 {
// I have nothing to do
return
}
waitBetweenPods := refreshPeriod / time.Duration(len(newPods))
zap.L().Info(
"Starting Pingers",
"Starting pingers for new pods",
zap.Int("numNewPods", len(newPods)),
zap.Duration("refreshPeriod", refreshPeriod),
zap.Duration("waitPeriod", waitBetweenPods),
zap.Float64("JitterFactor", GoldpingerConfig.JitterFactor),
)
for _, p := range pingers {
go p.PingContinuously(refreshPeriod, GoldpingerConfig.JitterFactor)
time.Sleep(waitBetweenPods)
initialWait := time.Duration(0)
for podName, pod := range newPods {
pinger := NewPinger(pod, resultsChan)
pingers[podName] = pinger
go pinger.PingContinuously(initialWait, refreshPeriod, GoldpingerConfig.JitterFactor)
initialWait += waitBetweenPods
}
}
// destroyPingers takes a list of deleted pods and then for each pod in the list, it stops
// the goroutines that continuously pings that pod and then deletes the pod from the list of pingers
func destroyPingers(pingers map[string]*Pinger, deletedPods map[string]*GoldpingerPod) {
for podName, pod := range deletedPods {
zap.L().Info(
"Deleting pod from pingers",
zap.String("name", podName),
zap.String("podIP", pod.PodIP),
zap.String("hostIP", pod.HostIP),
)
pinger := pingers[podName]
// Close the channel to stop pinging
close(pinger.stopChan)
// delete from pingers
delete(pingers, podName)
}
}
// updateCounters updates the value of health and unhealthy nodes as the results come in
func updateCounters(podName string, result *models.PodResult) {
// Get the previous value of ok
old := checkResults.PodResults[podName]
oldOk := (old.OK != nil && *old.OK)
// Check if the value of ok has changed
// If not, do nothing
if oldOk == *result.OK {
return
}
if *result.OK {
old, oldExists := checkResults.PodResults[podName]
switch {
case result == nil:
// This pod was just deleted
// If the previous value seen was ok, decrement
// the counter
if oldExists && old.OK != nil && *old.OK {
counterHealthy--
}
case !oldExists || old.OK == nil:
// If there is no previous response, this is an initialization step for this pod name
// The default is unhealthy, so:
// - if this pod is healthy, increment the count
// - if this pod is unhealthy, do not touch the count
if *result.OK {
counterHealthy++
}
case *old.OK == *result.OK:
// The old value is equal to the new value
// Do nothing!
case *result.OK:
// The value was previously false and just became true
// Increment the counter
counterHealthy++
} else {
default:
// The value was previously true and just became false
// Decrement the counter
counterHealthy--
}
CountHealthyUnhealthyNodes(counterHealthy, float64(len(checkResults.PodResults))-counterHealthy)
@@ -95,13 +181,16 @@ func updateCounters(podName string, result *models.PodResult) {
// collectResults simply reads results from the results channel and saves them in a map
func collectResults(resultsChan <-chan PingAllPodsResult) {
go func() {
for response := range resultsChan {
for response := range resultsChan {
if response.deleted {
updateCounters(response.podName, nil)
delete(checkResults.PodResults, response.podName)
} else {
result := response.podResult
updateCounters(response.podName, &result)
checkResults.PodResults[response.podName] = result
}
}()
}
}
func StartUpdater() {
@@ -111,12 +200,9 @@ func StartUpdater() {
}
pods := SelectPods()
zap.S().Infof("Got Pods: %+v", pods)
// Create a channel for the results
resultsChan := make(chan PingAllPodsResult, len(pods))
pingers := getPingers(pods, resultsChan)
initCheckResults(pingers)
startPingers(pingers)
collectResults(resultsChan)
go updatePingers(resultsChan)
go collectResults(resultsChan)
}