From ce77333416421123dcdd7c64ca529efadd1c1eac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20LOYET?= <822436+fatpat@users.noreply.github.com> Date: Thu, 26 Dec 2024 09:31:25 +0100 Subject: [PATCH] add dynamic labels --- Readme.md | 50 ++++++++++++++++++------------ pkg/config/config.go | 14 ++++++++- pkg/metrics/collector.go | 12 ++++++-- pkg/metrics/parser.go | 62 ++++++++++++++++++++++++++++++++++++++ pkg/metrics/parser_test.go | 28 +++++++++++++++++ 5 files changed, 144 insertions(+), 22 deletions(-) diff --git a/Readme.md b/Readme.md index fa4cc2d..29b25f9 100644 --- a/Readme.md +++ b/Readme.md @@ -3,14 +3,14 @@ This exporter translates from MQTT topics to prometheus metrics. The core design is that clients send arbitrary JSON messages -on the topics. The translation between the MQTT representation and prometheus metrics is configured in the mqtt2prometheus exporter since we often can not change the IoT devices sending +on the topics. The translation between the MQTT representation and prometheus metrics is configured in the mqtt2prometheus exporter since we often can not change the IoT devices sending the messages. Clients can push metrics via MQTT to an MQTT Broker. This exporter subscribes to the broker and expose the received messages as prometheus metrics. Currently, the exporter supports only MQTT 3.1. ![Overview Diagram](docs/overview.drawio.svg) I wrote this exporter to expose metrics from small embedded sensors based on the NodeMCU to prometheus. -The used arduino sketch can be found in the [dht22tomqtt](https://github.com/hikhvar/dht22tomqtt) repository. +The used arduino sketch can be found in the [dht22tomqtt](https://github.com/hikhvar/dht22tomqtt) repository. A local hacking environment with mqtt2prometheus, a MQTT broker and a prometheus server is in the [hack](https://github.com/hikhvar/mqtt2prometheus/tree/master/hack) directory. ## Assumptions about Messages and Topics @@ -36,7 +36,7 @@ The label `sensor` is extracted with the default `device_id_regex` `(.*/)?(?P.*)" # Optional: Configures mqtt2prometheus to expect an object containing multiple metrics to be published as the value on an mqtt topic. - # This is the default. + # This is the default. object_per_topic_config: # The encoding of the object, currently only json is supported encoding: JSON @@ -202,6 +202,10 @@ metrics: # A map of string to string for constant labels. This labels will be attached to every prometheus metric const_labels: sensor_type: dht22 + # A map of string to expression for dynamic labels. This labels will be attached to every prometheus metric + # expression will be executed for each label every time a metric is processed + # dynamic_labels: + # raw_value: "raw_value" # The name of the metric in prometheus - prom_name: humidity # The name of the metric in a MQTT JSON message @@ -310,7 +314,7 @@ Having the MQTT login details in the config file runs the risk of publishing the Create a file to store your login details, for example at `~/secrets/mqtt2prom`: ```SHELL #!/bin/bash -export MQTT2PROM_MQTT_USER="myUser" +export MQTT2PROM_MQTT_USER="myUser" export MQTT2PROM_MQTT_PASSWORD="superpassword" ``` @@ -330,9 +334,9 @@ Then load that file into the environment before starting the container: Create a docker secret to store the password(`mqtt-credential` in the example below), and pass the optional `treat-mqtt-password-as-file-name` command line argument. ```docker mqtt_exporter_tasmota: - image: ghcr.io/hikhvar/mqtt2prometheus:latest + image: ghcr.io/hikhvar/mqtt2prometheus:latest secrets: - - mqtt-credential + - mqtt-credential environment: - MQTT2PROM_MQTT_USER=mqtt - MQTT2PROM_MQTT_PASSWORD=/var/run/secrets/mqtt-credential @@ -346,6 +350,9 @@ Create a docker secret to store the password(`mqtt-credential` in the example be ### Expressions +Expression is a peace of code that is run dynamically for calculate metric value or generate dynamic labels. + +#### Metric value Metric values can be derived from sensor inputs using complex expressions. Set the metric config option `raw_expression` or `expression` to the desired formular to calculate the result from the input. `raw_expression` and `expression` are mutually exclusives: * `raw_expression` is run without raw value conversion. It's `raw_expression` duty to handle the conversion. Only `raw_value` is set while `value` is always set to 0.0. Here is an example which convert datetime (format `HYYMMDDhhmmss`) to unix timestamp: ```yaml @@ -356,11 +363,16 @@ raw_expression: 'date(string(raw_value), "H060102150405", "Europe/Paris").Unix() expression: "value > 0 ? last_result + value * elapsed.Seconds() : last_result" ``` +#### Dynamic labels +Dynamic labels are derivated from sensor inputs using complex expressions. Define labels and the corresponding expression in the metric config otpion `dynamic_labels`. +`raw_value` and `value` are both set in this context. The value returned from dynamic labels expression is not typed and will be converted to string before being exported. + +#### Expression During the evaluation, the following variables are available to the expression: * `raw_value` - the raw MQTT sensor value (without any conversion) * `value` - the current sensor value (after string-value mapping, if configured) * `last_value` - the `value` during the previous expression evaluation -* `last_result` - the result from the previous expression evaluation +* `last_result` - the result from the previous expression evaluation (a float for `raw_expression`/`expression`, a string for `dynamic_labels`) * `elapsed` - the time that passed since the previous evaluation, as a [Duration](https://pkg.go.dev/time#Duration) value The [language definition](https://expr-lang.org/docs/v1.9/Language-Definition) describes the expression syntax. In addition, the following functions are available: @@ -391,7 +403,7 @@ If `raw_expression` is set, the generated value of the expression is exported to ## Frequently Asked Questions ### Listen to multiple Topic Pathes -The exporter can only listen to one topic_path per instance. If you have to listen to two different topic_paths it is +The exporter can only listen to one topic_path per instance. If you have to listen to two different topic_paths it is recommended to run two instances of the mqtt2prometheus exporter. You can run both on the same host or if you run in Kubernetes, even in the same pod. @@ -411,7 +423,7 @@ heat_index{sensor="storage",topic="devices/workshop/storage"} 15.92 humidity{sensor="storage",topic="devices/workshop/storage"} 34.60 ``` -The following prometheus [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) will extract the location from the topic path as well and attaches the `location` label. +The following prometheus [relabel_config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) will extract the location from the topic path as well and attaches the `location` label. ```yaml relabel_config: - source_labels: [ "topic" ] diff --git a/pkg/config/config.go b/pkg/config/config.go index abf9572..21b5d74 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "regexp" + "sort" "time" "github.com/prometheus/client_golang/prometheus" @@ -144,6 +145,7 @@ type MetricConfig struct { Expression string `yaml:"expression"` ForceMonotonicy bool `yaml:"force_monotonicy"` ConstantLabels map[string]string `yaml:"const_labels"` + DynamicLabels map[string]string `yaml:"dynamic_labels"` StringValueMapping *StringValueMappingConfig `yaml:"string_value_mapping"` MQTTValueScale float64 `yaml:"mqtt_value_scale"` // ErrorValue is used while error during value parsing @@ -159,8 +161,9 @@ type StringValueMappingConfig struct { } func (mc *MetricConfig) PrometheusDescription() *prometheus.Desc { + labels := append([]string{"sensor", "topic"}, mc.DynamicLabelsKeys()...) return prometheus.NewDesc( - mc.PrometheusName, mc.Help, []string{"sensor", "topic"}, mc.ConstantLabels, + mc.PrometheusName, mc.Help, labels, mc.ConstantLabels, ) } @@ -175,6 +178,15 @@ func (mc *MetricConfig) PrometheusValueType() prometheus.ValueType { } } +func (mc *MetricConfig) DynamicLabelsKeys() []string { + var labels []string + for k := range mc.DynamicLabels { + labels = append(labels, k) + } + sort.Strings(labels) + return labels +} + func LoadConfig(configFile string, logger *zap.Logger) (Config, error) { configData, err := ioutil.ReadFile(configFile) if err != nil { diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go index 188957a..61fcc40 100644 --- a/pkg/metrics/collector.go +++ b/pkg/metrics/collector.go @@ -27,6 +27,8 @@ type Metric struct { ValueType prometheus.ValueType IngestTime time.Time Topic string + Labels map[string]string + LabelsKeys []string } type CacheItem struct { @@ -71,12 +73,18 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) { if metric.Description == nil { c.logger.Warn("empty description", zap.String("topic", metric.Topic), zap.Float64("value", metric.Value)) } + + // set dynamic labels with the right order starting with "sensor" and "topic" + labels := []string{device, metric.Topic} + for _, k := range metric.LabelsKeys { + labels = append(labels, metric.Labels[k]) + } + m := prometheus.MustNewConstMetric( metric.Description, metric.ValueType, metric.Value, - device, - metric.Topic, + labels..., ) if metric.IngestTime.IsZero() { diff --git a/pkg/metrics/parser.go b/pkg/metrics/parser.go index 4877384..f7493c8 100644 --- a/pkg/metrics/parser.go +++ b/pkg/metrics/parser.go @@ -27,6 +27,8 @@ type dynamicState struct { LastExprRawValue interface{} `yaml:"last_expr_raw_value"` // Last result returned from evaluating the given expression LastExprResult float64 `yaml:"last_expr_result"` + // Last result (String) returned from evaluating the given expression + LastExprResultString string `yaml:"last_expr_result_string"` // Last result returned from evaluating the given expression LastExprTimestamp time.Time `yaml:"last_expr_timestamp"` } @@ -270,11 +272,26 @@ func (p *Parser) parseMetric(cfg *config.MetricConfig, metricID string, value in ingestTime = now() } + // generate dynamic labels + var labels map[string]string + if len(cfg.DynamicLabels) > 0 { + labels = make(map[string]string, len(cfg.DynamicLabels)) + for k, v := range cfg.DynamicLabels { + value, err := p.evalExpressionLabel(metricID, k, v, value, metricValue) + if err != nil { + return Metric{}, err + } + labels[k] = value + } + } + return Metric{ Description: cfg.PrometheusDescription(), Value: metricValue, ValueType: cfg.PrometheusValueType(), IngestTime: ingestTime, + Labels: labels, + LabelsKeys: cfg.DynamicLabelsKeys(), }, nil } @@ -408,3 +425,48 @@ func (p *Parser) evalExpressionValue(metricID, code string, raw_value interface{ return ret, nil } + +// evalExpressionLabel runs the given code in the metric's environment and returns the result. +// In case of an error, the original value is returned. +func (p *Parser) evalExpressionLabel(metricID, label, code string, rawValue interface{}, value float64) (string, error) { + ms, err := p.getMetricState(label + "@" + metricID) + if err != nil { + return "", err + } + if ms.program == nil { + ms.env = defaultExprEnv() + ms.program, err = expr.Compile(code, expr.Env(ms.env)) + if err != nil { + return "", fmt.Errorf("failed to compile dynamic label expression %q: %w", code, err) + } + // Trigger flushing the new state to disk. + ms.lastWritten = time.Time{} + } + + // Update the environment + ms.env[env_raw_value] = rawValue + ms.env[env_value] = value + ms.env[env_last_value] = ms.dynamic.LastExprValue + ms.env[env_last_raw_value] = ms.dynamic.LastExprRawValue + ms.env[env_last_result] = ms.dynamic.LastExprResultString + if ms.dynamic.LastExprTimestamp.IsZero() { + ms.env[env_elapsed] = time.Duration(0) + } else { + ms.env[env_elapsed] = now().Sub(ms.dynamic.LastExprTimestamp) + } + + result, err := expr.Run(ms.program, ms.env) + if err != nil { + return "", fmt.Errorf("failed to evaluate dynamic label expression %q: %w", code, err) + } + + // convert to string + ret := fmt.Sprint(result) + + // Update the dynamic state + ms.dynamic.LastExprResultString = ret + ms.dynamic.LastExprValue = value + ms.dynamic.LastExprTimestamp = now() + + return ret, nil +} diff --git a/pkg/metrics/parser_test.go b/pkg/metrics/parser_test.go index 5a8036a..d3ddc19 100644 --- a/pkg/metrics/parser_test.go +++ b/pkg/metrics/parser_test.go @@ -101,6 +101,34 @@ func TestParser_parseMetric(t *testing.T) { Topic: "", }, }, + { + name: "string value with dynamic label", + fields: fields{ + map[string][]*config.MetricConfig{ + "temperature": { + { + PrometheusName: "temperature", + ValueType: "gauge", + DynamicLabels: map[string]string{"dynamic-label": `replace(raw_value, ".", "")`}, + }, + }, + }, + }, + args: args{ + metricPath: "temperature", + deviceID: "dht22", + value: "12.6", + }, + want: Metric{ + Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic", "dynamic-label"}, nil), + ValueType: prometheus.GaugeValue, + Value: 12.6, + IngestTime: testNow(), + Topic: "", + Labels: map[string]string{"dynamic-label": "126"}, + LabelsKeys: []string{"dynamic-label"}, + }, + }, { name: "scaled string value", fields: fields{