diff --git a/pkg/goldpinger/client.go b/pkg/goldpinger/client.go index 2c00c25..12be2b2 100644 --- a/pkg/goldpinger/client.go +++ b/pkg/goldpinger/client.go @@ -45,8 +45,7 @@ func CheckNeighboursNeighbours(ctx context.Context) *models.CheckAllResults { type PingAllPodsResult struct { podName string podResult models.PodResult - hostIPv4 strfmt.IPv4 - podIPv4 strfmt.IPv4 + deleted bool } func pickPodHostIP(podIP, hostIP string) string { @@ -99,20 +98,22 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo start := time.Now() // setup - var channelResult PingAllPodsResult - channelResult.podName = pod.Name - channelResult.hostIPv4.UnmarshalText([]byte(pod.HostIP)) - channelResult.podIPv4.UnmarshalText([]byte(pod.PodIP)) - + channelResult := PingAllPodsResult{podName: pod.Name} OK := false - var responseTime int64 - client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP)) + var responseTime int64 + var hostIPv4 strfmt.IPv4 + var podIPv4 strfmt.IPv4 + + hostIPv4.UnmarshalText([]byte(pod.HostIP)) + podIPv4.UnmarshalText([]byte(pod.PodIP)) + + client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP)) if err != nil { logger.Warn("Couldn't get a client for Ping", zap.Error(err)) channelResult.podResult = models.PodResult{ - PodIP: channelResult.podIPv4, - HostIP: channelResult.hostIPv4, + PodIP: podIPv4, + HostIP: hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 500, @@ -133,8 +134,8 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo if OK { logger.Debug("Pink Ok", zap.Int64("responseTime", responseTime)) channelResult.podResult = models.PodResult{ - PodIP: channelResult.podIPv4, - HostIP: channelResult.hostIPv4, + PodIP: podIPv4, + HostIP: hostIPv4, OK: &OK, Response: resp.Payload, StatusCode: 200, @@ -144,8 +145,8 @@ func PingAllPods(pingAllCtx context.Context, pods map[string]*GoldpingerPod) *mo } else { logger.Warn("Ping returned error", zap.Int64("responseTime", responseTime), zap.Error(err)) channelResult.podResult = models.PodResult{ - PodIP: channelResult.podIPv4, - HostIP: channelResult.hostIPv4, + PodIP: podIPv4, + HostIP: hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 504, diff --git a/pkg/goldpinger/pinger.go b/pkg/goldpinger/pinger.go index c743265..6b06e3b 100644 --- a/pkg/goldpinger/pinger.go +++ b/pkg/goldpinger/pinger.go @@ -1,3 +1,17 @@ +// Copyright 2018 Bloomberg Finance L.P. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package goldpinger import ( @@ -9,6 +23,7 @@ import ( apiclient "github.com/bloomberg/goldpinger/v3/pkg/client" "github.com/bloomberg/goldpinger/v3/pkg/client/operations" "github.com/bloomberg/goldpinger/v3/pkg/models" + "github.com/go-openapi/strfmt" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/util/wait" ) @@ -19,7 +34,8 @@ type Pinger struct { client *apiclient.Goldpinger timeout time.Duration histogram prometheus.Observer - result PingAllPodsResult + hostIPv4 strfmt.IPv4 + podIPv4 strfmt.IPv4 resultsChan chan<- PingAllPodsResult stopChan chan struct{} logger *zap.Logger @@ -49,25 +65,48 @@ func NewPinger(pod *GoldpingerPod, resultsChan chan<- PingAllPodsResult) *Pinger ), } - // Initialize the result - p.result.hostIPv4.UnmarshalText([]byte(pod.HostIP)) - p.result.podIPv4.UnmarshalText([]byte(pod.PodIP)) + // Initialize the host/pod IPv4 + p.hostIPv4.UnmarshalText([]byte(pod.HostIP)) + p.podIPv4.UnmarshalText([]byte(pod.PodIP)) - // Get a client for pinging the given pod - // On error, create a static pod result that does nothing - client, err := getClient(pickPodHostIP(pod.PodIP, pod.HostIP)) - if err == nil { - p.client = client - } else { - OK := false - p.client = nil - p.result.podResult = models.PodResult{HostIP: p.result.hostIPv4, OK: &OK, Error: err.Error(), StatusCode: 500, ResponseTimeMs: 0} - } return &p } +// getClient returns a client that can be used to ping the given pod +// On error, it returns a static result +func (p *Pinger) getClient() (*apiclient.Goldpinger, error) { + if p.client != nil { + return p.client, nil + } + + client, err := getClient(pickPodHostIP(p.pod.PodIP, p.pod.HostIP)) + if err != nil { + p.logger.Warn("Could not get client", zap.Error(err)) + OK := false + p.resultsChan <- PingAllPodsResult{ + podName: p.pod.Name, + podResult: models.PodResult{ + PodIP: p.podIPv4, + HostIP: p.hostIPv4, + OK: &OK, + Error: err.Error(), + StatusCode: 500, + ResponseTimeMs: 0, + }, + } + return nil, err + } + p.client = client + return p.client, nil +} + // Ping makes a single ping request to the given pod func (p *Pinger) Ping() { + client, err := p.getClient() + if err != nil { + return + } + CountCall("made", "ping") start := time.Now() @@ -75,39 +114,60 @@ func (p *Pinger) Ping() { defer cancel() params := operations.NewPingParamsWithContext(ctx) - resp, err := p.client.Operations.Ping(params) + resp, err := client.Operations.Ping(params) responseTime := time.Since(start) responseTimeMs := responseTime.Nanoseconds() / int64(time.Millisecond) p.histogram.Observe(responseTime.Seconds()) OK := (err == nil) if OK { - p.result.podResult = models.PodResult{ - PodIP: p.result.podIPv4, - HostIP: p.result.hostIPv4, - OK: &OK, - Response: resp.Payload, - StatusCode: 200, - ResponseTimeMs: responseTimeMs, + p.resultsChan <- PingAllPodsResult{ + podName: p.pod.Name, + podResult: models.PodResult{ + PodIP: p.podIPv4, + HostIP: p.hostIPv4, + OK: &OK, + Response: resp.Payload, + StatusCode: 200, + ResponseTimeMs: responseTimeMs, + }, } p.logger.Debug("Success pinging pod", zap.Duration("responseTime", responseTime)) } else { - p.result.podResult = models.PodResult{ - PodIP: p.result.podIPv4, - HostIP: p.result.hostIPv4, - OK: &OK, - Error: err.Error(), - StatusCode: 504, - ResponseTimeMs: responseTimeMs, + p.resultsChan <- PingAllPodsResult{ + podName: p.pod.Name, + podResult: models.PodResult{ + PodIP: p.podIPv4, + HostIP: p.hostIPv4, + OK: &OK, + Error: err.Error(), + StatusCode: 504, + ResponseTimeMs: responseTimeMs, + }, } p.logger.Warn("Ping returned error", zap.Duration("responseTime", responseTime), zap.Error(err)) CountError("ping") } - p.resultsChan <- p.result } // PingContinuously continuously pings the given pod with a delay between // `period` and `period + jitterFactor * period` -func (p *Pinger) PingContinuously(period time.Duration, jitterFactor float64) { - wait.JitterUntil(p.Ping, period, jitterFactor, false, p.stopChan) +func (p *Pinger) PingContinuously(initialWait time.Duration, period time.Duration, jitterFactor float64) { + p.logger.Info( + "Starting pinger", + zap.Duration("period", period), + zap.Duration("initialWait", initialWait), + zap.Float64("jitterFactor", jitterFactor), + ) + + timer := time.NewTimer(initialWait) + + select { + case <-timer.C: + wait.JitterUntil(p.Ping, period, jitterFactor, false, p.stopChan) + case <-p.stopChan: + // Do nothing + } + // We are done, send a message on the results channel to delete this + p.resultsChan <- PingAllPodsResult{podName: p.pod.Name, deleted: true} } diff --git a/pkg/goldpinger/updater.go b/pkg/goldpinger/updater.go index b12567b..813739a 100644 --- a/pkg/goldpinger/updater.go +++ b/pkg/goldpinger/updater.go @@ -15,6 +15,7 @@ package goldpinger import ( + "sync" "time" "go.uber.org/zap" @@ -23,71 +24,156 @@ import ( ) // checkResults holds the latest results of checking the pods -var checkResults models.CheckResults +var checkResults = models.CheckResults{PodResults: make(map[string]models.PodResult)} // counterHealthy is the number of healthy pods -var counterHealthy float64 +var counterHealthy = float64(0.0) -// getPingers creates a new set of pingers for the given pods -// Each pinger is responsible for pinging a single pod and returns -// the results on the results channel -func getPingers(pods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult) map[string]*Pinger { - pingers := map[string]*Pinger{} +// checkResultsMux controls concurrent access to checkResults +var checkResultsMux = sync.Mutex{} - for podName, pod := range pods { - pingers[podName] = NewPinger(pod, resultsChan) - } - return pingers +// exists checks whether there is an existing pinger for the given pod +// returns true if: +// - there is already a pinger with the same name +// - the pinger has the same podIP +// - the pinger has the same hostIP +func exists(existingPods map[string]*GoldpingerPod, new *GoldpingerPod) bool { + old, exists := existingPods[new.Name] + return exists && (old.PodIP == new.PodIP) && (old.HostIP == new.HostIP) } -// initCheckResults initializes the check results, which will be updated continuously -// as the results come in -func initCheckResults(pingers map[string]*Pinger) { - checkResults = models.CheckResults{} - checkResults.PodResults = make(map[string]models.PodResult) - for podName, pinger := range pingers { - checkResults.PodResults[podName] = pinger.result.podResult - } - counterHealthy = 0 -} - -// startPingers starts `n` goroutines to continuously ping all the given pods, one goroutine per pod -// It staggers the start of all the go-routines to prevent a thundering herd -func startPingers(pingers map[string]*Pinger) { +// updatePingers calls SelectPods() at regular intervals to get a new list of goldpinger pods to ping +// For each goldpinger pod, it then creates a pinger responsible for pinging it and returning the +// results on the result channel +func updatePingers(resultsChan chan<- PingAllPodsResult) { + // Important: This is the only goroutine that should have access to + // these maps since there is nothing controlling concurrent access + pingers := make(map[string]*Pinger) + existingPods := make(map[string]*GoldpingerPod) refreshPeriod := time.Duration(GoldpingerConfig.RefreshInterval) * time.Second - waitBetweenPods := refreshPeriod / time.Duration(len(pingers)) + + for { + // Initialize deletedPods to all existing pods, we will remove + // any pods that should still exist from this list after we are done + // NOTE: This is *NOT* a copy of existingPods just a new variable name + // to make the intention/code clear and cleaner + deletedPods := existingPods + + // New pods are brand new and haven't been seen before + newPods := make(map[string]*GoldpingerPod) + + latest := SelectPods() + for podName, pod := range latest { + if exists(existingPods, pod) { + // This pod continues to exist in the latest iteration of the update + // without any changes + // Delete it from the set of pods that we wish to delete + delete(deletedPods, podName) + } else { + // This pod is brand new and has never been seen before + // Add it to the list of newPods + newPods[podName] = pod + } + } + + // deletedPods now contains any pods that have either been deleted from the api-server + // *OR* weren't selected by our rendezvous hash + // *OR* had their host/pod IP changed. Remove those pingers + destroyPingers(pingers, deletedPods) + + // Next create pingers for new pods + createPingers(pingers, newPods, resultsChan, refreshPeriod) + + // Finally, just set existingPods to the latest and collect garbage + existingPods = latest + deletedPods = nil + newPods = nil + + // Wait the given time before pinging + time.Sleep(refreshPeriod) + } +} + +// createPingers allocates a new pinger object for each new goldpinger Pod that's been discovered +// It also: +// (a) initializes a result object in checkResults to store info on that pod +// (b) starts a new goroutines to continuously ping the given pod. +// Each new goroutine waits for a given time before starting the continuous ping +// to prevent a thundering herd +func createPingers(pingers map[string]*Pinger, newPods map[string]*GoldpingerPod, resultsChan chan<- PingAllPodsResult, refreshPeriod time.Duration) { + if len(newPods) == 0 { + // I have nothing to do + return + } + waitBetweenPods := refreshPeriod / time.Duration(len(newPods)) zap.L().Info( - "Starting Pingers", + "Starting pingers for new pods", + zap.Int("numNewPods", len(newPods)), zap.Duration("refreshPeriod", refreshPeriod), zap.Duration("waitPeriod", waitBetweenPods), zap.Float64("JitterFactor", GoldpingerConfig.JitterFactor), ) - for _, p := range pingers { - go p.PingContinuously(refreshPeriod, GoldpingerConfig.JitterFactor) - time.Sleep(waitBetweenPods) + initialWait := time.Duration(0) + for podName, pod := range newPods { + pinger := NewPinger(pod, resultsChan) + pingers[podName] = pinger + go pinger.PingContinuously(initialWait, refreshPeriod, GoldpingerConfig.JitterFactor) + initialWait += waitBetweenPods + } +} + +// destroyPingers takes a list of deleted pods and then for each pod in the list, it stops +// the goroutines that continuously pings that pod and then deletes the pod from the list of pingers +func destroyPingers(pingers map[string]*Pinger, deletedPods map[string]*GoldpingerPod) { + for podName, pod := range deletedPods { + zap.L().Info( + "Deleting pod from pingers", + zap.String("name", podName), + zap.String("podIP", pod.PodIP), + zap.String("hostIP", pod.HostIP), + ) + pinger := pingers[podName] + + // Close the channel to stop pinging + close(pinger.stopChan) + + // delete from pingers + delete(pingers, podName) } } // updateCounters updates the value of health and unhealthy nodes as the results come in func updateCounters(podName string, result *models.PodResult) { // Get the previous value of ok - old := checkResults.PodResults[podName] - oldOk := (old.OK != nil && *old.OK) - - // Check if the value of ok has changed - // If not, do nothing - if oldOk == *result.OK { - return - } - - if *result.OK { + old, oldExists := checkResults.PodResults[podName] + switch { + case result == nil: + // This pod was just deleted + // If the previous value seen was ok, decrement + // the counter + if oldExists && old.OK != nil && *old.OK { + counterHealthy-- + } + case !oldExists || old.OK == nil: + // If there is no previous response, this is an initialization step for this pod name + // The default is unhealthy, so: + // - if this pod is healthy, increment the count + // - if this pod is unhealthy, do not touch the count + if *result.OK { + counterHealthy++ + } + case *old.OK == *result.OK: + // The old value is equal to the new value + // Do nothing! + case *result.OK: // The value was previously false and just became true // Increment the counter counterHealthy++ - } else { + default: // The value was previously true and just became false + // Decrement the counter counterHealthy-- } CountHealthyUnhealthyNodes(counterHealthy, float64(len(checkResults.PodResults))-counterHealthy) @@ -95,13 +181,16 @@ func updateCounters(podName string, result *models.PodResult) { // collectResults simply reads results from the results channel and saves them in a map func collectResults(resultsChan <-chan PingAllPodsResult) { - go func() { - for response := range resultsChan { + for response := range resultsChan { + if response.deleted { + updateCounters(response.podName, nil) + delete(checkResults.PodResults, response.podName) + } else { result := response.podResult updateCounters(response.podName, &result) checkResults.PodResults[response.podName] = result } - }() + } } func StartUpdater() { @@ -111,12 +200,9 @@ func StartUpdater() { } pods := SelectPods() - zap.S().Infof("Got Pods: %+v", pods) // Create a channel for the results resultsChan := make(chan PingAllPodsResult, len(pods)) - pingers := getPingers(pods, resultsChan) - initCheckResults(pingers) - startPingers(pingers) - collectResults(resultsChan) + go updatePingers(resultsChan) + go collectResults(resultsChan) }