4 Commits

Author SHA1 Message Date
skamboj
fdb70968b8 Merge pull request #166 from cooperlees/master
Add UDP sequence number tracking for duplicate and out-of-order metrics
2026-04-20 10:03:22 -04:00
Sachin Kamboj
f4217e9241 Increment the version
Signed-off-by: Sachin Kamboj <skamboj1@bloomberg.net>
2026-04-20 08:41:35 -04:00
Cooper Ry Lees
7aaa34319c Clean up duplicate and out-of-order counters on pinger teardown
DeletePeerUDPMetrics was missing cleanup for the two counters added
in the previous commit (goldpinger_udp_duplicates_total and
goldpinger_udp_out_of_order_total). Without this, rolled pods would
leave stale counter label sets in /metrics.

Add TestDeletePeerUDPMetrics_CleansAllPerPeerMetrics which populates
every per-peer UDP metric, calls DeletePeerUDPMetrics, then verifies
all label sets are removed. If a future per-peer metric is added to
the perPeerCollectors map but not to DeletePeerUDPMetrics, the test
fails with: "metric X still has 1 label set(s) after
DeletePeerUDPMetrics — add it to the cleanup function"

Verified the test catches regressions by temporarily removing one
cleanup line — test correctly fails.

Signed-off-by: Cooper Ry Lees <me@cooperlees.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 13:33:43 +00:00
Cooper Ry Lees
943609d9a5 Add UDP sequence number tracking for duplicate and out-of-order detection
Track received sequence numbers in a map during each UDP probe to
detect duplicate replies (same seq seen twice) and out-of-order
delivery (seq lower than the highest previously seen).

The receive loop now uses a recvState struct with a processPacket
method that validates magic, checks for duplicate seq numbers, and
tracks ordering. Duplicates are not counted toward the received
total, and the loop allows up to 2*count iterations to handle them
without prematurely timing out.

New Prometheus counters:
  - goldpinger_udp_duplicates_total    — duplicate reply packets
  - goldpinger_udp_out_of_order_total  — out-of-order reply packets

Both are cumulative counters with labels (goldpinger_instance,
host_ip, pod_ip), incremented per-probe by the number of events
detected. Non-zero values indicate network-level packet duplication
or path asymmetry worth investigating.

New tests:
  - TestProbeUDP_Duplicates: echo listener sends every packet twice,
    verifies duplicates are detected and don't inflate received count
  - TestProbeUDP_OutOfOrder: echo listener buffers pairs and returns
    them in reverse order, verifies out-of-order is detected

Test results:
```
=== RUN   TestProbeUDP_NoLoss
    udp_probe_test.go:158: avg UDP RTT: 0.1011 ms
--- PASS: TestProbeUDP_NoLoss (0.00s)
=== RUN   TestProbeUDP_FullLoss
--- PASS: TestProbeUDP_FullLoss (0.00s)
=== RUN   TestProbeUDP_PartialLoss
=== RUN   TestProbeUDP_PartialLoss/drop_every_2nd_(50%)
    udp_probe_test.go:204: loss: 50.0% (expected 50.0%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_3rd_(33.3%)
    udp_probe_test.go:204: loss: 33.3% (expected 33.3%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_5th_(20%)
    udp_probe_test.go:204: loss: 20.0% (expected 20.0%)
=== RUN   TestProbeUDP_PartialLoss/drop_every_10th_(10%)
    udp_probe_test.go:204: loss: 10.0% (expected 10.0%)
--- PASS: TestProbeUDP_PartialLoss (8.01s)
=== RUN   TestProbeUDP_ZeroCount
--- PASS: TestProbeUDP_ZeroCount (0.00s)
=== RUN   TestProbeUDP_PacketFormat
--- PASS: TestProbeUDP_PacketFormat (0.00s)
=== RUN   TestProbeUDP_Duplicates
    udp_probe_test.go:246: duplicates detected: 4
--- PASS: TestProbeUDP_Duplicates (0.00s)
=== RUN   TestProbeUDP_OutOfOrder
    udp_probe_test.go:263: out-of-order detected: 5, duplicates: 0
--- PASS: TestProbeUDP_OutOfOrder (0.00s)
=== RUN   TestEstimateHops
--- PASS: TestEstimateHops (0.00s)
PASS
```

Signed-off-by: Cooper Ry Lees <me@cooperlees.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 17:24:45 +00:00
8 changed files with 313 additions and 51 deletions

View File

@@ -1,5 +1,5 @@
name ?= goldpinger
version ?= v3.11.0
version ?= v3.11.1
bin ?= goldpinger
pkg ?= "github.com/bloomberg/goldpinger"
tag = $(name):$(version)

View File

@@ -328,6 +328,8 @@ goldpinger_peers_loss_pct # gauge: UDP packet loss percentage (0-100)
goldpinger_peers_hop_count # gauge: estimated hop count
goldpinger_peers_udp_rtt_s # histogram: UDP round-trip time in seconds
goldpinger_udp_errors_total # counter: UDP probe errors
goldpinger_udp_duplicates_total # counter: duplicate UDP reply packets
goldpinger_udp_out_of_order_total # counter: out-of-order UDP reply packets
```
Links with partial loss are shown as yellow edges in the graph UI, and edge labels display the UDP RTT instead of HTTP latency when available.
@@ -370,6 +372,8 @@ goldpinger_errors_total
goldpinger_peers_loss_pct # (UDP probe, when enabled)
goldpinger_peers_hop_count # (UDP probe, when enabled)
goldpinger_peers_udp_rtt_s_* # (UDP probe, when enabled)
goldpinger_udp_duplicates_total # (UDP probe, when enabled)
goldpinger_udp_out_of_order_total # (UDP probe, when enabled)
```
### Grafana

View File

@@ -1,7 +1,7 @@
apiVersion: v1
name: goldpinger
appVersion: "3.11.0"
version: 1.1.0
appVersion: "3.11.1"
version: 1.1.1
description: Goldpinger is a tool to help debug, troubleshoot and visualize network connectivity and slowness issues.
home: https://github.com/bloomberg/goldpinger
sources:

View File

@@ -165,6 +165,12 @@ func (p *Pinger) Ping() {
if udpResult.AvgRttS > 0 {
ObservePeerUDPRtt(p.pod.HostIP, p.pod.PodIP, udpResult.AvgRttS)
}
if udpResult.Duplicates > 0 {
CountUDPDuplicates(p.pod.HostIP, p.pod.PodIP, udpResult.Duplicates)
}
if udpResult.OutOfOrder > 0 {
CountUDPOutOfOrder(p.pod.HostIP, p.pod.PodIP, udpResult.OutOfOrder)
}
}
if OK {

View File

@@ -167,6 +167,28 @@ var (
"host",
},
)
goldpingerUDPDuplicatesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "goldpinger_udp_duplicates_total",
Help: "Count of duplicate UDP reply packets received",
},
[]string{
"goldpinger_instance",
"host_ip",
"pod_ip",
},
)
goldpingerUDPOutOfOrderCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "goldpinger_udp_out_of_order_total",
Help: "Count of out-of-order UDP reply packets received",
},
[]string{
"goldpinger_instance",
"host_ip",
"pod_ip",
},
)
bootTime = time.Now()
)
@@ -184,6 +206,8 @@ func init() {
prometheus.MustRegister(goldpingerPeersHopCount)
prometheus.MustRegister(goldpingerPeersUDPRtt)
prometheus.MustRegister(goldpingerUDPErrorsCounter)
prometheus.MustRegister(goldpingerUDPDuplicatesCounter)
prometheus.MustRegister(goldpingerUDPOutOfOrderCounter)
zap.L().Info("Metrics setup - see /metrics")
}
@@ -276,11 +300,15 @@ func SetPeerHopCount(hostIP, podIP string, hopCount int32) {
).Set(float64(hopCount))
}
// DeletePeerUDPMetrics removes stale UDP metric labels for a destroyed peer
// DeletePeerUDPMetrics removes stale UDP metric labels for a destroyed peer.
// This must be kept in sync with all per-peer UDP metrics to avoid stale
// label sets lingering in /metrics after a pod rolls.
func DeletePeerUDPMetrics(hostIP, podIP string) {
goldpingerPeersLossPct.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerPeersHopCount.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerPeersUDPRtt.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerUDPDuplicatesCounter.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
goldpingerUDPOutOfOrderCounter.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
}
// ObservePeerUDPRtt records a UDP RTT observation in seconds
@@ -300,6 +328,24 @@ func CountUDPError(host string) {
).Inc()
}
// CountUDPDuplicates adds to the duplicate packet counter for a peer
func CountUDPDuplicates(hostIP, podIP string, n int) {
goldpingerUDPDuplicatesCounter.WithLabelValues(
GoldpingerConfig.Hostname,
hostIP,
podIP,
).Add(float64(n))
}
// CountUDPOutOfOrder adds to the out-of-order packet counter for a peer
func CountUDPOutOfOrder(hostIP, podIP string, n int) {
goldpingerUDPOutOfOrderCounter.WithLabelValues(
GoldpingerConfig.Hostname,
hostIP,
podIP,
).Add(float64(n))
}
// returns a timer for easy observing of the durations of calls to kubernetes API
func GetLabeledKubernetesCallsTimer() *prometheus.Timer {
return prometheus.NewTimer(

View File

@@ -0,0 +1,69 @@
package goldpinger
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
// TestDeletePeerUDPMetrics_CleansAllPerPeerMetrics verifies that
// DeletePeerUDPMetrics removes label sets from every per-peer UDP metric.
// If a new per-peer metric is added but not cleaned up in
// DeletePeerUDPMetrics, this test will fail.
func TestDeletePeerUDPMetrics_CleansAllPerPeerMetrics(t *testing.T) {
// Save and restore hostname since we set it for the test
origHostname := GoldpingerConfig.Hostname
GoldpingerConfig.Hostname = "test-instance"
defer func() { GoldpingerConfig.Hostname = origHostname }()
hostIP := "10.0.0.1"
podIP := "10.0.0.2"
// Populate all per-peer UDP metrics so they have label values
SetPeerLossPct(hostIP, podIP, 5.0)
SetPeerHopCount(hostIP, podIP, 2)
ObservePeerUDPRtt(hostIP, podIP, 0.001)
CountUDPDuplicates(hostIP, podIP, 1)
CountUDPOutOfOrder(hostIP, podIP, 1)
// Verify they exist before cleanup
perPeerCollectors := map[string]prometheus.Collector{
"goldpinger_peers_loss_pct": goldpingerPeersLossPct,
"goldpinger_peers_hop_count": goldpingerPeersHopCount,
"goldpinger_peers_udp_rtt_s": goldpingerPeersUDPRtt,
"goldpinger_udp_duplicates_total": goldpingerUDPDuplicatesCounter,
"goldpinger_udp_out_of_order_total": goldpingerUDPOutOfOrderCounter,
}
for name, collector := range perPeerCollectors {
if countMetrics(collector) == 0 {
t.Fatalf("metric %s has no label values before cleanup — test setup is broken", name)
}
}
// Run cleanup
DeletePeerUDPMetrics(hostIP, podIP)
// Verify all per-peer metrics are cleaned up
for name, collector := range perPeerCollectors {
if n := countMetrics(collector); n != 0 {
t.Errorf("metric %s still has %d label set(s) after DeletePeerUDPMetrics — add it to the cleanup function", name, n)
}
}
}
// countMetrics returns the number of metric families (label sets) for a collector.
func countMetrics(c prometheus.Collector) int {
ch := make(chan prometheus.Metric, 100)
c.Collect(ch)
close(ch)
count := 0
for m := range ch {
var d dto.Metric
if err := m.Write(&d); err == nil {
count++
}
}
return count
}

View File

@@ -40,10 +40,12 @@ const (
// UDPProbeResult holds the results of a UDP probe to a peer
type UDPProbeResult struct {
LossPct float64
HopCount int32
AvgRttS float64
Err error
LossPct float64
HopCount int32
AvgRttS float64
Duplicates int
OutOfOrder int
Err error
}
// StartUDPListener starts a UDP echo listener on the given port.
@@ -80,7 +82,51 @@ func StartUDPListener(port int) {
}
}
// ProbeUDP sends count UDP packets to the target and measures loss and hop count.
// recvState tracks state accumulated while receiving UDP probe replies.
type recvState struct {
received int
totalRttNs int64
ttlValue int
ttlFound bool
seen map[uint32]bool // sequence numbers already received
highestSeq int // highest sequence number seen so far (-1 = none)
duplicates int
outOfOrder int
}
// processPacket inspects a received packet and updates the receive state.
// Returns true if the packet was a valid, non-duplicate GPNG reply.
func (s *recvState) processPacket(buf []byte, n int, now time.Time) bool {
if n < udpHeaderSize {
return false
}
magic := binary.BigEndian.Uint32(buf[0:4])
if magic != udpMagic {
return false
}
seq := binary.BigEndian.Uint32(buf[4:8])
if s.seen[seq] {
s.duplicates++
return false
}
s.seen[seq] = true
seqInt := int(seq)
if s.highestSeq >= 0 && seqInt < s.highestSeq {
s.outOfOrder++
}
if seqInt > s.highestSeq {
s.highestSeq = seqInt
}
sentNs := int64(binary.BigEndian.Uint64(buf[8:16]))
s.totalRttNs += now.UnixNano() - sentNs
s.received++
return true
}
// ProbeUDP sends count UDP packets to the target and measures loss, hop count,
// RTT, and detects duplicate or out-of-order replies via sequence numbers.
func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDPProbeResult {
if count <= 0 {
return UDPProbeResult{Err: fmt.Errorf("packet count must be > 0, got %d", count)}
@@ -102,8 +148,6 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
// Determine if this is IPv4 or IPv6 and set up TTL/HopLimit reading
isIPv6 := net.ParseIP(targetIP).To4() == nil
var ttlValue int
ttlFound := false
if isIPv6 {
p := ipv6.NewPacketConn(conn.(*net.UDPConn))
@@ -132,78 +176,67 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
}
}
// Receive replies
received := 0
var totalRttNs int64
// Receive replies, tracking sequence numbers for duplicates/reordering
state := recvState{
seen: make(map[uint32]bool, count),
highestSeq: -1,
}
deadline := time.Now().Add(timeout)
conn.SetReadDeadline(deadline)
recvBuf := make([]byte, udpMaxPacketSize)
// We keep receiving until we have count unique replies or timeout.
// Duplicates don't count toward the received total, so we allow
// more iterations than count to handle them.
maxIter := count * 2
if isIPv6 {
p := ipv6.NewPacketConn(conn.(*net.UDPConn))
for received < count {
for i := 0; i < maxIter && state.received < count; i++ {
n, cm, _, err := p.ReadFrom(recvBuf)
now := time.Now()
if err != nil {
break
}
if n < udpHeaderSize {
continue
}
magic := binary.BigEndian.Uint32(recvBuf[0:4])
if magic != udpMagic {
continue
}
sentNs := int64(binary.BigEndian.Uint64(recvBuf[8:16]))
totalRttNs += now.UnixNano() - sentNs
received++
if cm != nil && cm.HopLimit > 0 && !ttlFound {
ttlValue = cm.HopLimit
ttlFound = true
state.processPacket(recvBuf[:n], n, now)
if cm != nil && cm.HopLimit > 0 && !state.ttlFound {
state.ttlValue = cm.HopLimit
state.ttlFound = true
}
}
} else {
p := ipv4.NewPacketConn(conn.(*net.UDPConn))
for received < count {
for i := 0; i < maxIter && state.received < count; i++ {
n, cm, _, err := p.ReadFrom(recvBuf)
now := time.Now()
if err != nil {
break
}
if n < udpHeaderSize {
continue
}
magic := binary.BigEndian.Uint32(recvBuf[0:4])
if magic != udpMagic {
continue
}
sentNs := int64(binary.BigEndian.Uint64(recvBuf[8:16]))
totalRttNs += now.UnixNano() - sentNs
received++
if cm != nil && cm.TTL > 0 && !ttlFound {
ttlValue = cm.TTL
ttlFound = true
state.processPacket(recvBuf[:n], n, now)
if cm != nil && cm.TTL > 0 && !state.ttlFound {
state.ttlValue = cm.TTL
state.ttlFound = true
}
}
}
lossPct := float64(count-received) / float64(count) * 100.0
lossPct := float64(count-state.received) / float64(count) * 100.0
var hopCount int32
if ttlFound {
hopCount = estimateHops(ttlValue)
if state.ttlFound {
hopCount = estimateHops(state.ttlValue)
}
var avgRttS float64
if received > 0 {
avgRttS = float64(totalRttNs) / float64(received) / 1e9
if state.received > 0 {
avgRttS = float64(state.totalRttNs) / float64(state.received) / 1e9
}
return UDPProbeResult{
LossPct: lossPct,
HopCount: hopCount,
AvgRttS: avgRttS,
LossPct: lossPct,
HopCount: hopCount,
AvgRttS: avgRttS,
Duplicates: state.duplicates,
OutOfOrder: state.outOfOrder,
}
}

View File

@@ -71,6 +71,76 @@ func startLossyEchoListener(t *testing.T, dropEveryN int) (int, func()) {
return port, func() { pc.Close() }
}
// startDuplicatingEchoListener echoes every packet twice, producing duplicates.
func startDuplicatingEchoListener(t *testing.T) (int, func()) {
t.Helper()
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
port := pc.LocalAddr().(*net.UDPAddr).Port
go func() {
buf := make([]byte, udpMaxPacketSize)
for {
n, addr, err := pc.ReadFrom(buf)
if err != nil {
return
}
if n >= udpHeaderSize {
magic := binary.BigEndian.Uint32(buf[0:4])
if magic == udpMagic {
pc.WriteTo(buf[:n], addr)
pc.WriteTo(buf[:n], addr) // duplicate
}
}
}
}()
return port, func() { pc.Close() }
}
// startReorderingEchoListener buffers two packets at a time and sends
// them back in reverse order, producing out-of-order replies.
func startReorderingEchoListener(t *testing.T) (int, func()) {
t.Helper()
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
port := pc.LocalAddr().(*net.UDPAddr).Port
go func() {
buf1 := make([]byte, udpMaxPacketSize)
buf2 := make([]byte, udpMaxPacketSize)
for {
// Read first packet
n1, addr1, err := pc.ReadFrom(buf1)
if err != nil {
return
}
// Read second packet
n2, addr2, err := pc.ReadFrom(buf2)
if err != nil {
// Got one but not two — send the first anyway
if n1 >= udpHeaderSize && binary.BigEndian.Uint32(buf1[0:4]) == udpMagic {
pc.WriteTo(buf1[:n1], addr1)
}
return
}
// Send them in reverse order
if n2 >= udpHeaderSize && binary.BigEndian.Uint32(buf2[0:4]) == udpMagic {
pc.WriteTo(buf2[:n2], addr2)
}
if n1 >= udpHeaderSize && binary.BigEndian.Uint32(buf1[0:4]) == udpMagic {
pc.WriteTo(buf1[:n1], addr1)
}
}
}()
return port, func() { pc.Close() }
}
func TestProbeUDP_NoLoss(t *testing.T) {
port, cleanup := startTestEchoListener(t)
defer cleanup()
@@ -159,6 +229,40 @@ func TestProbeUDP_PacketFormat(t *testing.T) {
}
}
func TestProbeUDP_Duplicates(t *testing.T) {
port, cleanup := startDuplicatingEchoListener(t)
defer cleanup()
result := ProbeUDP("127.0.0.1", port, 5, 64, 2*time.Second)
if result.Err != nil {
t.Fatalf("unexpected error: %v", result.Err)
}
if result.LossPct != 0 {
t.Errorf("expected 0%% loss, got %.1f%%", result.LossPct)
}
if result.Duplicates == 0 {
t.Error("expected duplicates > 0, got 0")
}
t.Logf("duplicates detected: %d", result.Duplicates)
}
func TestProbeUDP_OutOfOrder(t *testing.T) {
port, cleanup := startReorderingEchoListener(t)
defer cleanup()
result := ProbeUDP("127.0.0.1", port, 10, 64, 2*time.Second)
if result.Err != nil {
t.Fatalf("unexpected error: %v", result.Err)
}
if result.LossPct != 0 {
t.Errorf("expected 0%% loss, got %.1f%%", result.LossPct)
}
if result.OutOfOrder == 0 {
t.Error("expected out-of-order > 0, got 0")
}
t.Logf("out-of-order detected: %d, duplicates: %d", result.OutOfOrder, result.Duplicates)
}
func TestEstimateHops(t *testing.T) {
tests := []struct {
ttl int