mirror of
https://github.com/bloomberg/goldpinger.git
synced 2026-05-26 18:42:48 +00:00
Compare commits
4 Commits
goldpinger
...
v3.11.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdb70968b8 | ||
|
|
f4217e9241 | ||
|
|
7aaa34319c | ||
|
|
943609d9a5 |
2
Makefile
2
Makefile
@@ -1,5 +1,5 @@
|
||||
name ?= goldpinger
|
||||
version ?= v3.11.0
|
||||
version ?= v3.11.1
|
||||
bin ?= goldpinger
|
||||
pkg ?= "github.com/bloomberg/goldpinger"
|
||||
tag = $(name):$(version)
|
||||
|
||||
@@ -328,6 +328,8 @@ goldpinger_peers_loss_pct # gauge: UDP packet loss percentage (0-100)
|
||||
goldpinger_peers_hop_count # gauge: estimated hop count
|
||||
goldpinger_peers_udp_rtt_s # histogram: UDP round-trip time in seconds
|
||||
goldpinger_udp_errors_total # counter: UDP probe errors
|
||||
goldpinger_udp_duplicates_total # counter: duplicate UDP reply packets
|
||||
goldpinger_udp_out_of_order_total # counter: out-of-order UDP reply packets
|
||||
```
|
||||
|
||||
Links with partial loss are shown as yellow edges in the graph UI, and edge labels display the UDP RTT instead of HTTP latency when available.
|
||||
@@ -370,6 +372,8 @@ goldpinger_errors_total
|
||||
goldpinger_peers_loss_pct # (UDP probe, when enabled)
|
||||
goldpinger_peers_hop_count # (UDP probe, when enabled)
|
||||
goldpinger_peers_udp_rtt_s_* # (UDP probe, when enabled)
|
||||
goldpinger_udp_duplicates_total # (UDP probe, when enabled)
|
||||
goldpinger_udp_out_of_order_total # (UDP probe, when enabled)
|
||||
```
|
||||
|
||||
### Grafana
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
apiVersion: v1
|
||||
name: goldpinger
|
||||
appVersion: "3.11.0"
|
||||
version: 1.1.0
|
||||
appVersion: "3.11.1"
|
||||
version: 1.1.1
|
||||
description: Goldpinger is a tool to help debug, troubleshoot and visualize network connectivity and slowness issues.
|
||||
home: https://github.com/bloomberg/goldpinger
|
||||
sources:
|
||||
|
||||
@@ -165,6 +165,12 @@ func (p *Pinger) Ping() {
|
||||
if udpResult.AvgRttS > 0 {
|
||||
ObservePeerUDPRtt(p.pod.HostIP, p.pod.PodIP, udpResult.AvgRttS)
|
||||
}
|
||||
if udpResult.Duplicates > 0 {
|
||||
CountUDPDuplicates(p.pod.HostIP, p.pod.PodIP, udpResult.Duplicates)
|
||||
}
|
||||
if udpResult.OutOfOrder > 0 {
|
||||
CountUDPOutOfOrder(p.pod.HostIP, p.pod.PodIP, udpResult.OutOfOrder)
|
||||
}
|
||||
}
|
||||
|
||||
if OK {
|
||||
|
||||
@@ -167,6 +167,28 @@ var (
|
||||
"host",
|
||||
},
|
||||
)
|
||||
goldpingerUDPDuplicatesCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "goldpinger_udp_duplicates_total",
|
||||
Help: "Count of duplicate UDP reply packets received",
|
||||
},
|
||||
[]string{
|
||||
"goldpinger_instance",
|
||||
"host_ip",
|
||||
"pod_ip",
|
||||
},
|
||||
)
|
||||
goldpingerUDPOutOfOrderCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "goldpinger_udp_out_of_order_total",
|
||||
Help: "Count of out-of-order UDP reply packets received",
|
||||
},
|
||||
[]string{
|
||||
"goldpinger_instance",
|
||||
"host_ip",
|
||||
"pod_ip",
|
||||
},
|
||||
)
|
||||
bootTime = time.Now()
|
||||
)
|
||||
|
||||
@@ -184,6 +206,8 @@ func init() {
|
||||
prometheus.MustRegister(goldpingerPeersHopCount)
|
||||
prometheus.MustRegister(goldpingerPeersUDPRtt)
|
||||
prometheus.MustRegister(goldpingerUDPErrorsCounter)
|
||||
prometheus.MustRegister(goldpingerUDPDuplicatesCounter)
|
||||
prometheus.MustRegister(goldpingerUDPOutOfOrderCounter)
|
||||
zap.L().Info("Metrics setup - see /metrics")
|
||||
}
|
||||
|
||||
@@ -276,11 +300,15 @@ func SetPeerHopCount(hostIP, podIP string, hopCount int32) {
|
||||
).Set(float64(hopCount))
|
||||
}
|
||||
|
||||
// DeletePeerUDPMetrics removes stale UDP metric labels for a destroyed peer
|
||||
// DeletePeerUDPMetrics removes stale UDP metric labels for a destroyed peer.
|
||||
// This must be kept in sync with all per-peer UDP metrics to avoid stale
|
||||
// label sets lingering in /metrics after a pod rolls.
|
||||
func DeletePeerUDPMetrics(hostIP, podIP string) {
|
||||
goldpingerPeersLossPct.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
|
||||
goldpingerPeersHopCount.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
|
||||
goldpingerPeersUDPRtt.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
|
||||
goldpingerUDPDuplicatesCounter.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
|
||||
goldpingerUDPOutOfOrderCounter.DeleteLabelValues(GoldpingerConfig.Hostname, hostIP, podIP)
|
||||
}
|
||||
|
||||
// ObservePeerUDPRtt records a UDP RTT observation in seconds
|
||||
@@ -300,6 +328,24 @@ func CountUDPError(host string) {
|
||||
).Inc()
|
||||
}
|
||||
|
||||
// CountUDPDuplicates adds to the duplicate packet counter for a peer
|
||||
func CountUDPDuplicates(hostIP, podIP string, n int) {
|
||||
goldpingerUDPDuplicatesCounter.WithLabelValues(
|
||||
GoldpingerConfig.Hostname,
|
||||
hostIP,
|
||||
podIP,
|
||||
).Add(float64(n))
|
||||
}
|
||||
|
||||
// CountUDPOutOfOrder adds to the out-of-order packet counter for a peer
|
||||
func CountUDPOutOfOrder(hostIP, podIP string, n int) {
|
||||
goldpingerUDPOutOfOrderCounter.WithLabelValues(
|
||||
GoldpingerConfig.Hostname,
|
||||
hostIP,
|
||||
podIP,
|
||||
).Add(float64(n))
|
||||
}
|
||||
|
||||
// returns a timer for easy observing of the durations of calls to kubernetes API
|
||||
func GetLabeledKubernetesCallsTimer() *prometheus.Timer {
|
||||
return prometheus.NewTimer(
|
||||
|
||||
69
pkg/goldpinger/stats_test.go
Normal file
69
pkg/goldpinger/stats_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package goldpinger
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
)
|
||||
|
||||
// TestDeletePeerUDPMetrics_CleansAllPerPeerMetrics verifies that
|
||||
// DeletePeerUDPMetrics removes label sets from every per-peer UDP metric.
|
||||
// If a new per-peer metric is added but not cleaned up in
|
||||
// DeletePeerUDPMetrics, this test will fail.
|
||||
func TestDeletePeerUDPMetrics_CleansAllPerPeerMetrics(t *testing.T) {
|
||||
// Save and restore hostname since we set it for the test
|
||||
origHostname := GoldpingerConfig.Hostname
|
||||
GoldpingerConfig.Hostname = "test-instance"
|
||||
defer func() { GoldpingerConfig.Hostname = origHostname }()
|
||||
|
||||
hostIP := "10.0.0.1"
|
||||
podIP := "10.0.0.2"
|
||||
|
||||
// Populate all per-peer UDP metrics so they have label values
|
||||
SetPeerLossPct(hostIP, podIP, 5.0)
|
||||
SetPeerHopCount(hostIP, podIP, 2)
|
||||
ObservePeerUDPRtt(hostIP, podIP, 0.001)
|
||||
CountUDPDuplicates(hostIP, podIP, 1)
|
||||
CountUDPOutOfOrder(hostIP, podIP, 1)
|
||||
|
||||
// Verify they exist before cleanup
|
||||
perPeerCollectors := map[string]prometheus.Collector{
|
||||
"goldpinger_peers_loss_pct": goldpingerPeersLossPct,
|
||||
"goldpinger_peers_hop_count": goldpingerPeersHopCount,
|
||||
"goldpinger_peers_udp_rtt_s": goldpingerPeersUDPRtt,
|
||||
"goldpinger_udp_duplicates_total": goldpingerUDPDuplicatesCounter,
|
||||
"goldpinger_udp_out_of_order_total": goldpingerUDPOutOfOrderCounter,
|
||||
}
|
||||
|
||||
for name, collector := range perPeerCollectors {
|
||||
if countMetrics(collector) == 0 {
|
||||
t.Fatalf("metric %s has no label values before cleanup — test setup is broken", name)
|
||||
}
|
||||
}
|
||||
|
||||
// Run cleanup
|
||||
DeletePeerUDPMetrics(hostIP, podIP)
|
||||
|
||||
// Verify all per-peer metrics are cleaned up
|
||||
for name, collector := range perPeerCollectors {
|
||||
if n := countMetrics(collector); n != 0 {
|
||||
t.Errorf("metric %s still has %d label set(s) after DeletePeerUDPMetrics — add it to the cleanup function", name, n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// countMetrics returns the number of metric families (label sets) for a collector.
|
||||
func countMetrics(c prometheus.Collector) int {
|
||||
ch := make(chan prometheus.Metric, 100)
|
||||
c.Collect(ch)
|
||||
close(ch)
|
||||
count := 0
|
||||
for m := range ch {
|
||||
var d dto.Metric
|
||||
if err := m.Write(&d); err == nil {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
@@ -40,10 +40,12 @@ const (
|
||||
|
||||
// UDPProbeResult holds the results of a UDP probe to a peer
|
||||
type UDPProbeResult struct {
|
||||
LossPct float64
|
||||
HopCount int32
|
||||
AvgRttS float64
|
||||
Err error
|
||||
LossPct float64
|
||||
HopCount int32
|
||||
AvgRttS float64
|
||||
Duplicates int
|
||||
OutOfOrder int
|
||||
Err error
|
||||
}
|
||||
|
||||
// StartUDPListener starts a UDP echo listener on the given port.
|
||||
@@ -80,7 +82,51 @@ func StartUDPListener(port int) {
|
||||
}
|
||||
}
|
||||
|
||||
// ProbeUDP sends count UDP packets to the target and measures loss and hop count.
|
||||
// recvState tracks state accumulated while receiving UDP probe replies.
|
||||
type recvState struct {
|
||||
received int
|
||||
totalRttNs int64
|
||||
ttlValue int
|
||||
ttlFound bool
|
||||
seen map[uint32]bool // sequence numbers already received
|
||||
highestSeq int // highest sequence number seen so far (-1 = none)
|
||||
duplicates int
|
||||
outOfOrder int
|
||||
}
|
||||
|
||||
// processPacket inspects a received packet and updates the receive state.
|
||||
// Returns true if the packet was a valid, non-duplicate GPNG reply.
|
||||
func (s *recvState) processPacket(buf []byte, n int, now time.Time) bool {
|
||||
if n < udpHeaderSize {
|
||||
return false
|
||||
}
|
||||
magic := binary.BigEndian.Uint32(buf[0:4])
|
||||
if magic != udpMagic {
|
||||
return false
|
||||
}
|
||||
seq := binary.BigEndian.Uint32(buf[4:8])
|
||||
if s.seen[seq] {
|
||||
s.duplicates++
|
||||
return false
|
||||
}
|
||||
s.seen[seq] = true
|
||||
|
||||
seqInt := int(seq)
|
||||
if s.highestSeq >= 0 && seqInt < s.highestSeq {
|
||||
s.outOfOrder++
|
||||
}
|
||||
if seqInt > s.highestSeq {
|
||||
s.highestSeq = seqInt
|
||||
}
|
||||
|
||||
sentNs := int64(binary.BigEndian.Uint64(buf[8:16]))
|
||||
s.totalRttNs += now.UnixNano() - sentNs
|
||||
s.received++
|
||||
return true
|
||||
}
|
||||
|
||||
// ProbeUDP sends count UDP packets to the target and measures loss, hop count,
|
||||
// RTT, and detects duplicate or out-of-order replies via sequence numbers.
|
||||
func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDPProbeResult {
|
||||
if count <= 0 {
|
||||
return UDPProbeResult{Err: fmt.Errorf("packet count must be > 0, got %d", count)}
|
||||
@@ -102,8 +148,6 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
|
||||
|
||||
// Determine if this is IPv4 or IPv6 and set up TTL/HopLimit reading
|
||||
isIPv6 := net.ParseIP(targetIP).To4() == nil
|
||||
var ttlValue int
|
||||
ttlFound := false
|
||||
|
||||
if isIPv6 {
|
||||
p := ipv6.NewPacketConn(conn.(*net.UDPConn))
|
||||
@@ -132,78 +176,67 @@ func ProbeUDP(targetIP string, port, count, size int, timeout time.Duration) UDP
|
||||
}
|
||||
}
|
||||
|
||||
// Receive replies
|
||||
received := 0
|
||||
var totalRttNs int64
|
||||
// Receive replies, tracking sequence numbers for duplicates/reordering
|
||||
state := recvState{
|
||||
seen: make(map[uint32]bool, count),
|
||||
highestSeq: -1,
|
||||
}
|
||||
deadline := time.Now().Add(timeout)
|
||||
conn.SetReadDeadline(deadline)
|
||||
|
||||
recvBuf := make([]byte, udpMaxPacketSize)
|
||||
|
||||
// We keep receiving until we have count unique replies or timeout.
|
||||
// Duplicates don't count toward the received total, so we allow
|
||||
// more iterations than count to handle them.
|
||||
maxIter := count * 2
|
||||
if isIPv6 {
|
||||
p := ipv6.NewPacketConn(conn.(*net.UDPConn))
|
||||
for received < count {
|
||||
for i := 0; i < maxIter && state.received < count; i++ {
|
||||
n, cm, _, err := p.ReadFrom(recvBuf)
|
||||
now := time.Now()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if n < udpHeaderSize {
|
||||
continue
|
||||
}
|
||||
magic := binary.BigEndian.Uint32(recvBuf[0:4])
|
||||
if magic != udpMagic {
|
||||
continue
|
||||
}
|
||||
sentNs := int64(binary.BigEndian.Uint64(recvBuf[8:16]))
|
||||
totalRttNs += now.UnixNano() - sentNs
|
||||
received++
|
||||
if cm != nil && cm.HopLimit > 0 && !ttlFound {
|
||||
ttlValue = cm.HopLimit
|
||||
ttlFound = true
|
||||
state.processPacket(recvBuf[:n], n, now)
|
||||
if cm != nil && cm.HopLimit > 0 && !state.ttlFound {
|
||||
state.ttlValue = cm.HopLimit
|
||||
state.ttlFound = true
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p := ipv4.NewPacketConn(conn.(*net.UDPConn))
|
||||
for received < count {
|
||||
for i := 0; i < maxIter && state.received < count; i++ {
|
||||
n, cm, _, err := p.ReadFrom(recvBuf)
|
||||
now := time.Now()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if n < udpHeaderSize {
|
||||
continue
|
||||
}
|
||||
magic := binary.BigEndian.Uint32(recvBuf[0:4])
|
||||
if magic != udpMagic {
|
||||
continue
|
||||
}
|
||||
sentNs := int64(binary.BigEndian.Uint64(recvBuf[8:16]))
|
||||
totalRttNs += now.UnixNano() - sentNs
|
||||
received++
|
||||
if cm != nil && cm.TTL > 0 && !ttlFound {
|
||||
ttlValue = cm.TTL
|
||||
ttlFound = true
|
||||
state.processPacket(recvBuf[:n], n, now)
|
||||
if cm != nil && cm.TTL > 0 && !state.ttlFound {
|
||||
state.ttlValue = cm.TTL
|
||||
state.ttlFound = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lossPct := float64(count-received) / float64(count) * 100.0
|
||||
lossPct := float64(count-state.received) / float64(count) * 100.0
|
||||
|
||||
var hopCount int32
|
||||
if ttlFound {
|
||||
hopCount = estimateHops(ttlValue)
|
||||
if state.ttlFound {
|
||||
hopCount = estimateHops(state.ttlValue)
|
||||
}
|
||||
|
||||
var avgRttS float64
|
||||
if received > 0 {
|
||||
avgRttS = float64(totalRttNs) / float64(received) / 1e9
|
||||
if state.received > 0 {
|
||||
avgRttS = float64(state.totalRttNs) / float64(state.received) / 1e9
|
||||
}
|
||||
|
||||
return UDPProbeResult{
|
||||
LossPct: lossPct,
|
||||
HopCount: hopCount,
|
||||
AvgRttS: avgRttS,
|
||||
LossPct: lossPct,
|
||||
HopCount: hopCount,
|
||||
AvgRttS: avgRttS,
|
||||
Duplicates: state.duplicates,
|
||||
OutOfOrder: state.outOfOrder,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -71,6 +71,76 @@ func startLossyEchoListener(t *testing.T, dropEveryN int) (int, func()) {
|
||||
return port, func() { pc.Close() }
|
||||
}
|
||||
|
||||
// startDuplicatingEchoListener echoes every packet twice, producing duplicates.
|
||||
func startDuplicatingEchoListener(t *testing.T) (int, func()) {
|
||||
t.Helper()
|
||||
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
port := pc.LocalAddr().(*net.UDPAddr).Port
|
||||
|
||||
go func() {
|
||||
buf := make([]byte, udpMaxPacketSize)
|
||||
for {
|
||||
n, addr, err := pc.ReadFrom(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if n >= udpHeaderSize {
|
||||
magic := binary.BigEndian.Uint32(buf[0:4])
|
||||
if magic == udpMagic {
|
||||
pc.WriteTo(buf[:n], addr)
|
||||
pc.WriteTo(buf[:n], addr) // duplicate
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return port, func() { pc.Close() }
|
||||
}
|
||||
|
||||
// startReorderingEchoListener buffers two packets at a time and sends
|
||||
// them back in reverse order, producing out-of-order replies.
|
||||
func startReorderingEchoListener(t *testing.T) (int, func()) {
|
||||
t.Helper()
|
||||
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
port := pc.LocalAddr().(*net.UDPAddr).Port
|
||||
|
||||
go func() {
|
||||
buf1 := make([]byte, udpMaxPacketSize)
|
||||
buf2 := make([]byte, udpMaxPacketSize)
|
||||
for {
|
||||
// Read first packet
|
||||
n1, addr1, err := pc.ReadFrom(buf1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Read second packet
|
||||
n2, addr2, err := pc.ReadFrom(buf2)
|
||||
if err != nil {
|
||||
// Got one but not two — send the first anyway
|
||||
if n1 >= udpHeaderSize && binary.BigEndian.Uint32(buf1[0:4]) == udpMagic {
|
||||
pc.WriteTo(buf1[:n1], addr1)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Send them in reverse order
|
||||
if n2 >= udpHeaderSize && binary.BigEndian.Uint32(buf2[0:4]) == udpMagic {
|
||||
pc.WriteTo(buf2[:n2], addr2)
|
||||
}
|
||||
if n1 >= udpHeaderSize && binary.BigEndian.Uint32(buf1[0:4]) == udpMagic {
|
||||
pc.WriteTo(buf1[:n1], addr1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return port, func() { pc.Close() }
|
||||
}
|
||||
|
||||
func TestProbeUDP_NoLoss(t *testing.T) {
|
||||
port, cleanup := startTestEchoListener(t)
|
||||
defer cleanup()
|
||||
@@ -159,6 +229,40 @@ func TestProbeUDP_PacketFormat(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProbeUDP_Duplicates(t *testing.T) {
|
||||
port, cleanup := startDuplicatingEchoListener(t)
|
||||
defer cleanup()
|
||||
|
||||
result := ProbeUDP("127.0.0.1", port, 5, 64, 2*time.Second)
|
||||
if result.Err != nil {
|
||||
t.Fatalf("unexpected error: %v", result.Err)
|
||||
}
|
||||
if result.LossPct != 0 {
|
||||
t.Errorf("expected 0%% loss, got %.1f%%", result.LossPct)
|
||||
}
|
||||
if result.Duplicates == 0 {
|
||||
t.Error("expected duplicates > 0, got 0")
|
||||
}
|
||||
t.Logf("duplicates detected: %d", result.Duplicates)
|
||||
}
|
||||
|
||||
func TestProbeUDP_OutOfOrder(t *testing.T) {
|
||||
port, cleanup := startReorderingEchoListener(t)
|
||||
defer cleanup()
|
||||
|
||||
result := ProbeUDP("127.0.0.1", port, 10, 64, 2*time.Second)
|
||||
if result.Err != nil {
|
||||
t.Fatalf("unexpected error: %v", result.Err)
|
||||
}
|
||||
if result.LossPct != 0 {
|
||||
t.Errorf("expected 0%% loss, got %.1f%%", result.LossPct)
|
||||
}
|
||||
if result.OutOfOrder == 0 {
|
||||
t.Error("expected out-of-order > 0, got 0")
|
||||
}
|
||||
t.Logf("out-of-order detected: %d, duplicates: %d", result.OutOfOrder, result.Duplicates)
|
||||
}
|
||||
|
||||
func TestEstimateHops(t *testing.T) {
|
||||
tests := []struct {
|
||||
ttl int
|
||||
|
||||
Reference in New Issue
Block a user