Add option to enforce strict monotonicy in metrics.

This change adds the new option `force_monotonicy` to metric
configurations. It is intended for almost-but-not-really monotinic
sources, such as counters which reset when the sensor is restarted.

When this option is set to `true`, the source metric value is regularly
written to disk. This allows us to detect and compensate counter resets
even between restarts. When a reset is detected, the last value before
the reset becomes the new offset, which is added to the metric value
going forth.  The result is a strictly monotonic time series, like an
ever increasing counter.
This commit is contained in:
Christian Schneider
2023-12-06 10:12:23 +01:00
committed by Christian Schneider
parent 5c1917ff68
commit 4943c97963
6 changed files with 261 additions and 11 deletions

View File

@@ -182,6 +182,8 @@ cache:
# Set the timeout to -1 to disable the deletion of metrics from the cache. The exporter presents the ingest timestamp # Set the timeout to -1 to disable the deletion of metrics from the cache. The exporter presents the ingest timestamp
# to prometheus. # to prometheus.
timeout: 24h timeout: 24h
# Path to the directory to keep the state for monotonic metrics.
state_directory: "/var/lib/mqtt2prometheus"
json_parsing: json_parsing:
# Separator. Used to split path to elements when accessing json fields. # Separator. Used to split path to elements when accessing json fields.
# You can access json fields with dots in it. F.E. {"key.name": {"nested": "value"}} # You can access json fields with dots in it. F.E. {"key.name": {"nested": "value"}}
@@ -248,6 +250,18 @@ metrics:
# Metric value to use if a match cannot be found in the map above. # Metric value to use if a match cannot be found in the map above.
# If not specified, parsing error will occur. # If not specified, parsing error will occur.
error_value: 1 error_value: 1
# The name of the metric in prometheus
- prom_name: total_energy
# The name of the metric in a MQTT JSON message
mqtt_name: aenergy.total
# Regular expression to only match sensors with the given name pattern
sensor_name_filter: "^shellyplus1pm-.*$"
# The prometheus help text for this metric
help: Total energy used
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: counter
# This setting requires an almost monotonic counter as the source. When monotonicy is enforced, the metric value is regularly written to disk. Thus, resets in the source counter can be detected and corrected by adding an offset as if the reset did not happen. The result is a strict monotonic increasing time series, like an ever growing counter.
force_monotonicy: true
``` ```

View File

@@ -227,7 +227,7 @@ func setupGoKitLogger(l *zap.Logger) log.Logger {
} }
func setupExtractor(cfg config.Config) (metrics.Extractor, error) { func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator) parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator, cfg.Cache.StateDir)
if cfg.MQTT.ObjectPerTopicConfig != nil { if cfg.MQTT.ObjectPerTopicConfig != nil {
switch cfg.MQTT.ObjectPerTopicConfig.Encoding { switch cfg.MQTT.ObjectPerTopicConfig.Encoding {
case config.EncodingJSON: case config.EncodingJSON:

View File

@@ -3,6 +3,7 @@ package config
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"regexp" "regexp"
"time" "time"
@@ -26,7 +27,8 @@ var MQTTConfigDefaults = MQTTConfig{
} }
var CacheConfigDefaults = CacheConfig{ var CacheConfigDefaults = CacheConfig{
Timeout: 2 * time.Minute, Timeout: 2 * time.Minute,
StateDir: "/var/lib/mqtt2prometheus",
} }
var JsonParsingConfigDefaults = JsonParsingConfig{ var JsonParsingConfigDefaults = JsonParsingConfig{
@@ -94,7 +96,8 @@ type Config struct {
} }
type CacheConfig struct { type CacheConfig struct {
Timeout time.Duration `yaml:"timeout"` Timeout time.Duration `yaml:"timeout"`
StateDir string `yaml:"state_directory"`
} }
type JsonParsingConfig struct { type JsonParsingConfig struct {
@@ -135,6 +138,7 @@ type MetricConfig struct {
Help string `yaml:"help"` Help string `yaml:"help"`
ValueType string `yaml:"type"` ValueType string `yaml:"type"`
OmitTimestamp bool `yaml:"omit_timestamp"` OmitTimestamp bool `yaml:"omit_timestamp"`
ForceMonotonicy bool `yaml:"force_monotonicy"`
ConstantLabels map[string]string `yaml:"const_labels"` ConstantLabels map[string]string `yaml:"const_labels"`
StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"` StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"`
MQTTValueScale float64 `yaml:"mqtt_value_scale"` MQTTValueScale float64 `yaml:"mqtt_value_scale"`
@@ -179,6 +183,9 @@ func LoadConfig(configFile string) (Config, error) {
if cfg.Cache == nil { if cfg.Cache == nil {
cfg.Cache = &CacheConfigDefaults cfg.Cache = &CacheConfigDefaults
} }
if cfg.Cache.StateDir == "" {
cfg.Cache.StateDir = CacheConfigDefaults.StateDir
}
if cfg.JsonParsing == nil { if cfg.JsonParsing == nil {
cfg.JsonParsing = &JsonParsingConfigDefaults cfg.JsonParsing = &JsonParsingConfigDefaults
} }
@@ -217,5 +224,18 @@ func LoadConfig(configFile string) (Config, error) {
} }
} }
// If any metric forces monotonicy, we need a state directory.
forcesMonotonicy := false
for _, m := range cfg.Metrics {
if m.ForceMonotonicy {
forcesMonotonicy = true
}
}
if forcesMonotonicy {
if err := os.MkdirAll(cfg.Cache.StateDir, 0755); err != nil {
return Config{}, err
}
}
return cfg, nil return cfg, nil
} }

View File

@@ -2,6 +2,7 @@ package metrics
import ( import (
"fmt" "fmt"
"regexp"
"github.com/hikhvar/mqtt2prometheus/pkg/config" "github.com/hikhvar/mqtt2prometheus/pkg/config"
gojsonq "github.com/thedevsaddam/gojsonq/v2" gojsonq "github.com/thedevsaddam/gojsonq/v2"
@@ -9,6 +10,16 @@ import (
type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error) type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error)
// metricID returns a deterministic identifier per metic config which is safe to use in a file path.
func metricID(topic, metric, deviceID, promName string) string {
re := regexp.MustCompile(`[^a-zA-Z0-9]`)
deviceID = re.ReplaceAllString(deviceID, "_")
topic = re.ReplaceAllString(topic, "_")
metric = re.ReplaceAllString(metric, "_")
promName = re.ReplaceAllString(promName, "_")
return fmt.Sprintf("%s-%s-%s-%s", deviceID, topic, metric, promName)
}
func NewJSONObjectExtractor(p Parser) Extractor { func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) { return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
var mc MetricCollection var mc MetricCollection
@@ -27,7 +38,8 @@ func NewJSONObjectExtractor(p Parser) Extractor {
continue continue
} }
m, err := p.parseMetric(config, rawValue) id := metricID(topic, path, deviceID, config.PrometheusName)
m, err := p.parseMetric(config, id, rawValue)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse valid metric value: %w", err) return nil, fmt.Errorf("failed to parse valid metric value: %w", err)
} }
@@ -63,7 +75,8 @@ func NewMetricPerTopicExtractor(p Parser, metricNameRegex *config.Regexp) Extrac
rawValue = string(payload) rawValue = string(payload)
} }
m, err := p.parseMetric(config, rawValue) id := metricID(topic, metricName, deviceID, config.PrometheusName)
m, err := p.parseMetric(config, id, rawValue)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse metric: %w", err) return nil, fmt.Errorf("failed to parse metric: %w", err)
} }

View File

@@ -2,22 +2,44 @@ package metrics
import ( import (
"fmt" "fmt"
"os"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/hikhvar/mqtt2prometheus/pkg/config" "github.com/hikhvar/mqtt2prometheus/pkg/config"
"gopkg.in/yaml.v2"
) )
// monotonicState holds the runtime information to realize a monotonic increasing value.
type monotonicState struct {
// Basline value to add to each parsed metric value to maintain monotonicy
Offset float64 `yaml:"value_offset"`
// Last value that was parsed before the offset was added
LastRawValue float64 `yaml:"last_raw_value"`
}
// metricState holds runtime information per metric configuration.
type metricState struct {
monotonic monotonicState
// The last time the state file was written
lastWritten time.Time
}
type Parser struct { type Parser struct {
separator string separator string
// Maps the mqtt metric name to a list of configs // Maps the mqtt metric name to a list of configs
// The first that matches SensorNameFilter will be used // The first that matches SensorNameFilter will be used
metricConfigs map[string][]config.MetricConfig metricConfigs map[string][]config.MetricConfig
// Directory holding state files
stateDir string
// Per-metric state
states map[string]*metricState
} }
var now = time.Now var now = time.Now
func NewParser(metrics []config.MetricConfig, separator string) Parser { func NewParser(metrics []config.MetricConfig, separator, stateDir string) Parser {
cfgs := make(map[string][]config.MetricConfig) cfgs := make(map[string][]config.MetricConfig)
for i := range metrics { for i := range metrics {
key := metrics[i].MQTTName key := metrics[i].MQTTName
@@ -26,6 +48,8 @@ func NewParser(metrics []config.MetricConfig, separator string) Parser {
return Parser{ return Parser{
separator: separator, separator: separator,
metricConfigs: cfgs, metricConfigs: cfgs,
stateDir: strings.TrimRight(stateDir, "/"),
states: make(map[string]*metricState),
} }
} }
@@ -47,7 +71,7 @@ func (p *Parser) findMetricConfig(metric string, deviceID string) (config.Metric
// parseMetric parses the given value according to the given deviceID and metricPath. The config allows to // parseMetric parses the given value according to the given deviceID and metricPath. The config allows to
// parse a metric value according to the device ID. // parse a metric value according to the device ID.
func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric, error) { func (p *Parser) parseMetric(cfg config.MetricConfig, metricID string, value interface{}) (Metric, error) {
var metricValue float64 var metricValue float64
if boolValue, ok := value.(bool); ok { if boolValue, ok := value.(bool); ok {
@@ -87,6 +111,22 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value) return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)
} }
if cfg.ForceMonotonicy {
ms, err := p.getMetricState(metricID)
if err != nil {
return Metric{}, err
}
// When the source metric is reset, the last adjusted value becomes the new offset.
if metricValue < ms.monotonic.LastRawValue {
ms.monotonic.Offset += ms.monotonic.LastRawValue
// Trigger flushing the new state to disk.
ms.lastWritten = time.Time{}
}
ms.monotonic.LastRawValue = metricValue
metricValue += ms.monotonic.Offset
}
if cfg.MQTTValueScale != 0 { if cfg.MQTTValueScale != 0 {
metricValue = metricValue * cfg.MQTTValueScale metricValue = metricValue * cfg.MQTTValueScale
} }
@@ -103,3 +143,53 @@ func (p *Parser) parseMetric(cfg config.MetricConfig, value interface{}) (Metric
IngestTime: ingestTime, IngestTime: ingestTime,
}, nil }, nil
} }
func (p *Parser) stateFileName(metricID string) string {
return fmt.Sprintf("%s/%s.yaml", p.stateDir, metricID)
}
// readMetricState parses the metric state from the configured path.
// If the file does not exist, an empty state is returned.
func (p *Parser) readMetricState(metricID string) (*metricState, error) {
data, err := os.ReadFile(p.stateFileName(metricID))
state := &metricState{}
if err != nil {
// The file does not exist for new metrics.
if os.IsNotExist(err) {
return state, nil
}
return state, err
}
err = yaml.UnmarshalStrict(data, &state.monotonic)
state.lastWritten = now()
return state, err
}
// writeMetricState writes back the metric's current state to the configured path.
func (p *Parser) writeMetricState(metricID string, state *metricState) error {
out, err := yaml.Marshal(state.monotonic)
if err != nil {
return err
}
return os.WriteFile(p.stateFileName(metricID), out, 0644)
}
// getMetricState returns the state of the given metric.
// The state is read from and written back to disk as needed.
func (p *Parser) getMetricState(metricID string) (*metricState, error) {
var err error
state, found := p.states[metricID]
if !found {
if state, err = p.readMetricState(metricID); err != nil {
return nil, err
}
p.states[metricID] = state
}
// Write the state back to disc every minute.
if now().Sub(state.lastWritten) >= time.Minute {
if err = p.writeMetricState(metricID, state); err == nil {
state.lastWritten = now()
}
}
return state, err
}

View File

@@ -1,6 +1,7 @@
package metrics package metrics
import ( import (
"os"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@@ -10,6 +11,12 @@ import (
) )
func TestParser_parseMetric(t *testing.T) { func TestParser_parseMetric(t *testing.T) {
stateDir, err := os.MkdirTemp("", "parser_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(stateDir)
now = testNow now = testNow
type fields struct { type fields struct {
metricConfigs map[string][]config.MetricConfig metricConfigs map[string][]config.MetricConfig
@@ -415,12 +422,111 @@ func TestParser_parseMetric(t *testing.T) {
}, },
wantErr: true, wantErr: true,
}, },
{
name: "monotonic gauge, step 1: initial value",
fields: fields{
map[string][]config.MetricConfig{
"aenergy.total": []config.MetricConfig{
{
PrometheusName: "total_energy",
ValueType: "gauge",
OmitTimestamp: true,
ForceMonotonicy: true,
},
},
},
},
args: args{
metricPath: "aenergy.total",
deviceID: "shellyplus1pm-foo",
value: 1.0,
},
want: Metric{
Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 1.0,
},
},
{
name: "monotonic gauge, step 2: monotonic increase does not add offset",
fields: fields{
map[string][]config.MetricConfig{
"aenergy.total": []config.MetricConfig{
{
PrometheusName: "total_energy",
ValueType: "gauge",
OmitTimestamp: true,
ForceMonotonicy: true,
},
},
},
},
args: args{
metricPath: "aenergy.total",
deviceID: "shellyplus1pm-foo",
value: 2.0,
},
want: Metric{
Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 2.0,
},
},
{
name: "monotonic gauge, step 3: raw metric is reset, last value becomes the new offset",
fields: fields{
map[string][]config.MetricConfig{
"aenergy.total": []config.MetricConfig{
{
PrometheusName: "total_energy",
ValueType: "gauge",
OmitTimestamp: true,
ForceMonotonicy: true,
},
},
},
},
args: args{
metricPath: "aenergy.total",
deviceID: "shellyplus1pm-foo",
value: 0.0,
},
want: Metric{
Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 2.0,
},
},
{
name: "monotonic gauge, step 4: monotonic increase with offset",
fields: fields{
map[string][]config.MetricConfig{
"aenergy.total": []config.MetricConfig{
{
PrometheusName: "total_energy",
ValueType: "gauge",
OmitTimestamp: true,
ForceMonotonicy: true,
},
},
},
},
args: args{
metricPath: "aenergy.total",
deviceID: "shellyplus1pm-foo",
value: 1.0,
},
want: Metric{
Description: prometheus.NewDesc("total_energy", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 3.0,
},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
p := &Parser{ p := NewParser(nil, config.JsonParsingConfigDefaults.Separator, stateDir)
metricConfigs: tt.fields.metricConfigs, p.metricConfigs = tt.fields.metricConfigs
}
// Find a valid metrics config // Find a valid metrics config
config, found := p.findMetricConfig(tt.args.metricPath, tt.args.deviceID) config, found := p.findMetricConfig(tt.args.metricPath, tt.args.deviceID)
@@ -431,7 +537,8 @@ func TestParser_parseMetric(t *testing.T) {
return return
} }
got, err := p.parseMetric(config, tt.args.value) id := metricID("", tt.args.metricPath, tt.args.deviceID, config.PrometheusName)
got, err := p.parseMetric(config, id, tt.args.value)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr)
return return
@@ -439,6 +546,12 @@ func TestParser_parseMetric(t *testing.T) {
if !reflect.DeepEqual(got, tt.want) { if !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseMetric() got = %v, want %v", got, tt.want) t.Errorf("parseMetric() got = %v, want %v", got, tt.want)
} }
if config.ForceMonotonicy {
if err = p.writeMetricState(id, p.states[id]); err != nil {
t.Errorf("failed to write metric state: %v", err)
}
}
}) })
} }
} }