Address PR #164 review feedback

Concurrent HTTP + UDP pings:
  HTTP ping and UDP probe now run in separate goroutines via
  sync.WaitGroup, so UDP timeout doesn't add to the ping cycle
  latency. (skamboj on pinger.go:124)

Remove duplicate log:
  Removed the "UDP echo listener started" log from main.go since
  StartUDPListener already logs it. (skamboj on main.go:191)

Prometheus base units (seconds):
  Renamed goldpinger_peers_udp_rtt_ms back to goldpinger_peers_udp_rtt_s
  with sub-millisecond histogram buckets (.0001s to 1s), per Prometheus
  naming conventions. RTT is computed in seconds internally and only
  converted to ms for the JSON API. (skamboj on stats.go:150)

Rename path_length to hop_count:
  goldpinger_peers_path_length → goldpinger_peers_hop_count, and
  SetPeerPathLength → SetPeerHopCount. (skamboj on stats.go:139)

UDP buffer constant and packet size clamping:
  Added udpMaxPacketSize=1500 constant, documented as standard Ethernet
  MTU — the largest UDP payload that survives most networks without
  fragmentation. Used for both listener and prober receive buffers.
  ProbeUDP now clamps UDP_PACKET_SIZE to udpMaxPacketSize to prevent
  silent truncation if someone configures a size > MTU.
  (skamboj on udp_probe.go:54)

Guard count=0:
  ProbeUDP returns an error immediately if count <= 0 instead of
  dividing by zero. (skamboj on udp_probe.go:176)

UDP error counter:
  Added goldpinger_udp_errors_total counter (labels: goldpinger_instance,
  host). CountUDPError is called on dial failures and send errors.
  (skamboj on udp_probe.go:115)

Test: random source port for full loss:
  TestProbeUDP_FullLoss now binds an ephemeral port and closes it,
  instead of assuming port 19999 is free. (skamboj on udp_probe_test.go:56)

Test: partial loss validation:
  New TestProbeUDP_PartialLoss uses a lossy echo listener that drops
  every Nth packet to validate loss calculations are exact:
    drop every 2nd → 50.0%, every 3rd → 33.3%,
    every 5th → 20.0%, every 10th → 10.0%
  (skamboj on udp_probe_test.go:96)

Test: zero count:
  New TestProbeUDP_ZeroCount verifies error is returned for count=0.

Test results:
```
=== RUN   TestProbeUDP_NoLoss
    udp_probe_test.go:88: avg UDP RTT: 0.0816 ms
--- PASS: TestProbeUDP_NoLoss (0.00s)
=== RUN   TestProbeUDP_FullLoss
--- PASS: TestProbeUDP_FullLoss (0.00s)
=== RUN   TestProbeUDP_PartialLoss
=== RUN   TestProbeUDP_PartialLoss/drop_every_2nd_(50%)
    udp_probe_test.go:134: loss: 50.0% (expected 50.0%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_3rd_(33.3%)
    udp_probe_test.go:134: loss: 33.3% (expected 33.3%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_5th_(20%)
    udp_probe_test.go:134: loss: 20.0% (expected 20.0%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_10th_(10%)
    udp_probe_test.go:134: loss: 10.0% (expected 10.0%)
--- PASS: TestProbeUDP_PartialLoss (8.00s)
=== RUN   TestProbeUDP_ZeroCount
--- PASS: TestProbeUDP_ZeroCount (0.00s)
=== RUN   TestProbeUDP_PacketFormat
--- PASS: TestProbeUDP_PacketFormat (0.00s)
=== RUN   TestEstimateHops
--- PASS: TestEstimateHops (0.00s)
PASS
```

Signed-off-by: Cooper Ry Lees <me@cooperlees.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Cooper Ry Lees
2026-04-02 19:37:52 +00:00
parent 832bc7b598
commit 641b658f23
6 changed files with 187 additions and 60 deletions

View File

@@ -325,8 +325,9 @@ This adds three Prometheus metrics:
```sh
goldpinger_peers_loss_pct # gauge: UDP packet loss percentage (0-100)
goldpinger_peers_path_length # gauge: estimated hop count
goldpinger_peers_udp_rtt_ms # histogram: UDP round-trip time in milliseconds
goldpinger_peers_hop_count # gauge: estimated hop count
goldpinger_peers_udp_rtt_s # histogram: UDP round-trip time in seconds
goldpinger_udp_errors_total # counter: UDP probe errors
```
Links with partial loss are shown as yellow edges in the graph UI, and edge labels display the UDP RTT instead of HTTP latency when available.
@@ -367,8 +368,8 @@ goldpinger_nodes_health_total
goldpinger_stats_total
goldpinger_errors_total
goldpinger_peers_loss_pct # (UDP probe, when enabled)
goldpinger_peers_path_length # (UDP probe, when enabled)
goldpinger_peers_udp_rtt_ms_* # (UDP probe, when enabled)
goldpinger_peers_hop_count # (UDP probe, when enabled)
goldpinger_peers_udp_rtt_s_* # (UDP probe, when enabled)
```
### Grafana

View File

@@ -188,11 +188,6 @@ func main() {
if goldpinger.GoldpingerConfig.UDPEnabled {
go goldpinger.StartUDPListener(goldpinger.GoldpingerConfig.UDPPort)
logger.Info("UDP echo listener started",
zap.Int("port", goldpinger.GoldpingerConfig.UDPPort),
zap.Int("packetCount", goldpinger.GoldpingerConfig.UDPPacketCount),
zap.Int("packetSize", goldpinger.GoldpingerConfig.UDPPacketSize),
)
}
logger.Info("All good, starting serving the API")

View File

@@ -16,6 +16,7 @@ package goldpinger
import (
"context"
"sync"
"time"
"go.uber.org/zap"
@@ -111,6 +112,24 @@ func (p *Pinger) Ping() {
CountCall("made", "ping")
start := time.Now()
// Run HTTP ping and UDP probe concurrently
var udpResult UDPProbeResult
var wg sync.WaitGroup
if GoldpingerConfig.UDPEnabled {
wg.Add(1)
go func() {
defer wg.Done()
targetIP := pickPodHostIP(p.pod.PodIP, p.pod.HostIP)
udpResult = ProbeUDP(
targetIP,
GoldpingerConfig.UDPPort,
GoldpingerConfig.UDPPacketCount,
GoldpingerConfig.UDPPacketSize,
GoldpingerConfig.UDPTimeout,
)
}()
}
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
@@ -120,37 +139,31 @@ func (p *Pinger) Ping() {
responseTimeMs := responseTime.Nanoseconds() / int64(time.Millisecond)
p.histogram.Observe(responseTime.Seconds())
// Wait for UDP probe to complete
wg.Wait()
OK := (err == nil)
// Run UDP probe if enabled
var lossPct float64
var pathLength int32
var hopCount int32
var udpRttMs float64
if GoldpingerConfig.UDPEnabled {
targetIP := pickPodHostIP(p.pod.PodIP, p.pod.HostIP)
udpResult := ProbeUDP(
targetIP,
GoldpingerConfig.UDPPort,
GoldpingerConfig.UDPPacketCount,
GoldpingerConfig.UDPPacketSize,
GoldpingerConfig.UDPTimeout,
)
lossPct = udpResult.LossPct
pathLength = udpResult.PathLength
udpRttMs = udpResult.AvgRttMs
hopCount = udpResult.HopCount
udpRttMs = udpResult.AvgRttS * 1000.0
if udpResult.Err != nil {
p.logger.Warn("UDP probe error", zap.Error(udpResult.Err))
} else {
p.logger.Debug("UDP probe complete",
zap.Float64("lossPct", lossPct),
zap.Int32("pathLength", pathLength),
zap.Int32("hopCount", hopCount),
zap.Float64("udpRttMs", udpRttMs),
)
}
SetPeerLossPct(p.pod.HostIP, p.pod.PodIP, lossPct)
SetPeerPathLength(p.pod.HostIP, p.pod.PodIP, pathLength)
if udpRttMs > 0 {
ObservePeerUDPRtt(p.pod.HostIP, p.pod.PodIP, udpRttMs)
SetPeerHopCount(p.pod.HostIP, p.pod.PodIP, hopCount)
if udpResult.AvgRttS > 0 {
ObservePeerUDPRtt(p.pod.HostIP, p.pod.PodIP, udpResult.AvgRttS)
}
}
@@ -166,7 +179,7 @@ func (p *Pinger) Ping() {
StatusCode: 200,
ResponseTimeMs: responseTimeMs,
LossPct: lossPct,
PathLength: pathLength,
PathLength: hopCount,
UDPRttMs: udpRttMs,
},
}
@@ -183,7 +196,7 @@ func (p *Pinger) Ping() {
StatusCode: 504,
ResponseTimeMs: responseTimeMs,
LossPct: lossPct,
PathLength: pathLength,
PathLength: hopCount,
UDPRttMs: udpRttMs,
},
}

View File

@@ -134,9 +134,9 @@ var (
"pod_ip",
},
)
goldpingerPeersPathLength = prometheus.NewGaugeVec(
goldpingerPeersHopCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "goldpinger_peers_path_length",
Name: "goldpinger_peers_hop_count",
Help: "Estimated network hop count to peer from UDP TTL",
},
[]string{
@@ -147,9 +147,9 @@ var (
)
goldpingerPeersUDPRtt = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "goldpinger_peers_udp_rtt_ms",
Help: "Histogram of UDP round-trip times to peers in milliseconds",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 25, 50, 100, 250, 500, 1000},
Name: "goldpinger_peers_udp_rtt_s",
Help: "Histogram of UDP round-trip times to peers in seconds",
Buckets: []float64{.0001, .00025, .0005, .001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
},
[]string{
"goldpinger_instance",
@@ -157,6 +157,16 @@ var (
"pod_ip",
},
)
goldpingerUDPErrorsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "goldpinger_udp_errors_total",
Help: "Statistics of UDP probe errors per instance",
},
[]string{
"goldpinger_instance",
"host",
},
)
bootTime = time.Now()
)
@@ -171,8 +181,9 @@ func init() {
prometheus.MustRegister(goldPingerHttpErrorsCounter)
prometheus.MustRegister(goldPingerTcpErrorsCounter)
prometheus.MustRegister(goldpingerPeersLossPct)
prometheus.MustRegister(goldpingerPeersPathLength)
prometheus.MustRegister(goldpingerPeersHopCount)
prometheus.MustRegister(goldpingerPeersUDPRtt)
prometheus.MustRegister(goldpingerUDPErrorsCounter)
zap.L().Info("Metrics setup - see /metrics")
}
@@ -256,29 +267,37 @@ func SetPeerLossPct(hostIP, podIP string, lossPct float64) {
).Set(lossPct)
}
// SetPeerPathLength sets the estimated hop count gauge for a peer
func SetPeerPathLength(hostIP, podIP string, pathLength int32) {
goldpingerPeersPathLength.WithLabelValues(
// SetPeerHopCount sets the estimated hop count gauge for a peer
func SetPeerHopCount(hostIP, podIP string, hopCount int32) {
goldpingerPeersHopCount.WithLabelValues(
GoldpingerConfig.Hostname,
hostIP,
podIP,
).Set(float64(pathLength))
).Set(float64(hopCount))
}
// DeletePeerUDPMetrics removes stale UDP metric labels for a destroyed peer
func DeletePeerUDPMetrics(hostIP, podIP string) {
goldpingerPeersLossPct.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerPeersPathLength.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerPeersHopCount.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerPeersUDPRtt.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
}
// ObservePeerUDPRtt records a UDP RTT observation in milliseconds
func ObservePeerUDPRtt(hostIP, podIP string, rttMs float64) {
// ObservePeerUDPRtt records a UDP RTT observation in seconds
func ObservePeerUDPRtt(hostIP, podIP string, rttS float64) {
goldpingerPeersUDPRtt.WithLabelValues(
GoldpingerConfig.Hostname,
hostIP,
podIP,
).Observe(rttMs)
).Observe(rttS)
}
// CountUDPError counts instances of UDP probe errors
func CountUDPError(host string) {
goldpingerUDPErrorsCounter.WithLabelValues(
GoldpingerConfig.Hostname,
host,
).Inc()
}
// returns a timer for easy observing of the durations of calls to kubernetes API

View File

@@ -29,14 +29,21 @@ import (
const (
udpMagic = 0x47504E47 // "GPNG"
udpHeaderSize = 16 // 4 magic + 4 seq + 8 timestamp
// udpMaxPacketSize is the maximum UDP packet we will read. Sized to the
// standard Ethernet MTU (1500 bytes) which is the largest packet that
// will survive most networks without fragmentation. Any configured
// UDP_PACKET_SIZE larger than this will be clamped to this value to
// avoid silent truncation on the receive side.
udpMaxPacketSize = 1500
)
// UDPProbeResult holds the results of a UDP probe to a peer
type UDPProbeResult struct {
LossPct float64
PathLength int32
AvgRttMs float64
Err error
LossPct float64
HopCount int32
AvgRttS float64
Err error
}
// StartUDPListener starts a UDP echo listener on the given port.
@@ -51,7 +58,7 @@ func StartUDPListener(port int) {
zap.L().Info("UDP echo listener started", zap.String("addr", addr))
buf := make([]byte, 1500)
buf := make([]byte, udpMaxPacketSize)
for {
n, remoteAddr, err := pc.ReadFrom(buf)
if err != nil {
@@ -75,13 +82,20 @@ func StartUDPListener(port int) {
// ProbeUDP sends count UDP packets to the target and measures loss and hop count.
func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDPProbeResult {
if count <= 0 {
return UDPProbeResult{Err: fmt.Errorf("packet count must be > 0, got %d", count)}
}
if size < udpHeaderSize {
size = udpHeaderSize
}
if size > udpMaxPacketSize {
size = udpMaxPacketSize
}
addr := net.JoinHostPort(targetIP, strconv.Itoa(port))
conn, err := net.Dial("udp", addr)
if err != nil {
CountUDPError(targetIP)
return UDPProbeResult{LossPct: 100, Err: fmt.Errorf("dial: %w", err)}
}
defer conn.Close()
@@ -113,6 +127,7 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
binary.BigEndian.PutUint64(pkt[8:16], uint64(time.Now().UnixNano()))
_, err := conn.Write(pkt)
if err != nil {
CountUDPError(targetIP)
zap.L().Debug("UDP send error", zap.Int("seq", i), zap.Error(err))
}
}
@@ -123,7 +138,7 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
deadline := time.Now().Add(timeout)
conn.SetReadDeadline(deadline)
recvBuf := make([]byte, 1500)
recvBuf := make([]byte, udpMaxPacketSize)
if isIPv6 {
p := ipv6.NewPacketConn(conn.(*net.UDPConn))
@@ -175,20 +190,20 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
lossPct := float64(count-received) / float64(count) * 100.0
var pathLength int32
var hopCount int32
if ttlFound {
pathLength = estimateHops(ttlValue)
hopCount = estimateHops(ttlValue)
}
var avgRttMs float64
var avgRttS float64
if received > 0 {
avgRttMs = float64(totalRttNs) / float64(received) / 1e6
avgRttS = float64(totalRttNs) / float64(received) / 1e9
}
return UDPProbeResult{
LossPct: lossPct,
PathLength: pathLength,
AvgRttMs: avgRttMs,
LossPct: lossPct,
HopCount: hopCount,
AvgRttS: avgRttS,
}
}

View File

@@ -3,6 +3,7 @@ package goldpinger
import (
"encoding/binary"
"net"
"sync/atomic"
"testing"
"time"
)
@@ -16,7 +17,7 @@ func startTestEchoListener(t *testing.T) (int, func()) {
port := pc.LocalAddr().(*net.UDPAddr).Port
go func() {
buf := make([]byte, 1500)
buf := make([]byte, udpMaxPacketSize)
for {
n, addr, err := pc.ReadFrom(buf)
if err != nil {
@@ -34,6 +35,42 @@ func startTestEchoListener(t *testing.T) (int, func()) {
return port, func() { pc.Close() }
}
// startLossyEchoListener echoes back packets but drops every dropEveryN-th
// packet (1-indexed). For example, dropEveryN=3 drops packets 3, 6, 9, etc.
func startLossyEchoListener(t *testing.T, dropEveryN int) (int, func()) {
t.Helper()
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
port := pc.LocalAddr().(*net.UDPAddr).Port
var counter atomic.Int64
go func() {
buf := make([]byte, udpMaxPacketSize)
for {
n, addr, err := pc.ReadFrom(buf)
if err != nil {
return
}
if n >= udpHeaderSize {
magic := binary.BigEndian.Uint32(buf[0:4])
if magic == udpMagic {
seq := counter.Add(1)
if seq%int64(dropEveryN) == 0 {
// Drop this packet — don't echo
continue
}
pc.WriteTo(buf[:n], addr)
}
}
}
}()
return port, func() { pc.Close() }
}
func TestProbeUDP_NoLoss(t *testing.T) {
port, cleanup := startTestEchoListener(t)
defer cleanup()
@@ -45,20 +82,67 @@ func TestProbeUDP_NoLoss(t *testing.T) {
if result.LossPct != 0 {
t.Errorf("expected 0%% loss, got %.1f%%", result.LossPct)
}
if result.AvgRttMs <= 0 {
t.Errorf("expected positive RTT, got %.4f ms", result.AvgRttMs)
if result.AvgRttS <= 0 {
t.Errorf("expected positive RTT, got %.6f s", result.AvgRttS)
}
t.Logf("avg UDP RTT: %.4f ms", result.AvgRttMs)
t.Logf("avg UDP RTT: %.4f ms", result.AvgRttS*1000)
}
func TestProbeUDP_FullLoss(t *testing.T) {
// Use a port with no listener
result := ProbeUDP("127.0.0.1", 19999, 5, 64, 200*time.Millisecond)
// Bind a port then close it so nothing is listening.
// This avoids assuming a hardcoded port like 19999 is free.
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
port := pc.LocalAddr().(*net.UDPAddr).Port
pc.Close()
result := ProbeUDP("127.0.0.1", port, 5, 64, 200*time.Millisecond)
if result.LossPct != 100 {
t.Errorf("expected 100%% loss, got %.1f%%", result.LossPct)
}
}
func TestProbeUDP_PartialLoss(t *testing.T) {
tests := []struct {
name string
count int
dropEveryN int
expectedPct float64
}{
{"drop every 2nd (50%)", 10, 2, 50.0},
{"drop every 3rd (33.3%)", 9, 3, 100.0 / 3.0},
{"drop every 5th (20%)", 10, 5, 20.0},
{"drop every 10th (10%)", 10, 10, 10.0},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
port, cleanup := startLossyEchoListener(t, tt.dropEveryN)
defer cleanup()
result := ProbeUDP("127.0.0.1", port, tt.count, 64, 2*time.Second)
if result.Err != nil {
t.Fatalf("unexpected error: %v", result.Err)
}
// Allow small floating point tolerance
diff := result.LossPct - tt.expectedPct
if diff < -0.1 || diff > 0.1 {
t.Errorf("expected %.1f%% loss, got %.1f%%", tt.expectedPct, result.LossPct)
}
t.Logf("loss: %.1f%% (expected %.1f%%)", result.LossPct, tt.expectedPct)
})
}
}
func TestProbeUDP_ZeroCount(t *testing.T) {
result := ProbeUDP("127.0.0.1", 12345, 0, 64, 200*time.Millisecond)
if result.Err == nil {
t.Error("expected error for count=0, got nil")
}
}
func TestProbeUDP_PacketFormat(t *testing.T) {
pkt := make([]byte, 64)
binary.BigEndian.PutUint32(pkt[0:4], udpMagic)