Refactor metric extraction from MQTT

This commit allows to extract the metric name from the topic path. Now
it can be configured if all metrics are in a object in a certain topic
or if every topic contains exactly one metric. However, currently these
modes can not be mixed.

This should solve !26

TODO:
* Update documentation
* Add unit tests
This commit is contained in:
Christoph Petrausch
2020-11-08 22:01:36 +01:00
parent b3442ecf3b
commit be4af9ff5e
13 changed files with 330 additions and 119 deletions

View File

@@ -54,6 +54,11 @@ var (
)
)
type Ingest interface {
MessageMetric() *prometheus.CounterVec
SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler
}
func main() {
flag.Parse()
if *versionFlag {
@@ -77,9 +82,12 @@ func main() {
mqttClientOptions.SetPassword(cfg.MQTT.Password)
mqttClientOptions.SetClientID(mustMQTTClientID())
collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics, cfg.MQTT.DeviceIDRegex)
collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics, logger)
extractor, err := setupExtractor(cfg)
if err != nil {
logger.Fatal("could not setup a metric extractor", zap.Error(err))
}
ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex)
errorChan := make(chan error, 1)
for {
@@ -97,7 +105,7 @@ func main() {
time.Sleep(10 * time.Second)
}
prometheus.MustRegister(ingest.MessageMetric)
prometheus.MustRegister(ingest.MessageMetric())
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
go func() {
@@ -163,3 +171,19 @@ func mustSetupLogger() *zap.Logger {
config.SetProcessContext(logger)
return logger
}
func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics)
if cfg.MQTT.ObjectPerTopicConfig != nil {
switch cfg.MQTT.ObjectPerTopicConfig.Encoding {
case config.EncodingJSON:
return metrics.NewJSONObjectExtractor(parser), nil
default:
return nil, fmt.Errorf("unsupported object format: %s", cfg.MQTT.ObjectPerTopicConfig.Encoding)
}
}
if cfg.MQTT.MetricPerTopicConfig != nil {
return metrics.NewMetricPerTopicExtractor(parser, cfg.MQTT.MetricPerTopicConfig.MetricNameRegex), nil
}
return nil, fmt.Errorf("no extractor configured")
}

29
hack/Readme.md Normal file
View File

@@ -0,0 +1,29 @@
# Hack Scenarios
Required is a MQTT client. I use this: https://github.com/shirou/mqttcli
## Shelly (Metric Per Topic)
The scenario is the feature requested by issue https://github.com/hikhvar/mqtt2prometheus/issues/26.
To start the scenario run:
```bash
docker-compose --env-file shelly.env up
```
To publish a message run:
```bash
mqttcli pub --host localhost -p 1883 -t shellies/living-room/sensor/temperature '15'
```
## DHT22 (Object Per Topic)
The default scenario
To start the scenario run:
```bash
docker-compose --env-file dht22.env up
```
To publish a message run:
```bash
mqttcli pub --host localhost -p 1883 -t v1/devices/me/test -m '{"temperature":"12", "humidity":21}'
```

1
hack/dht.env Normal file
View File

@@ -0,0 +1 @@
CONFIG=dht22.yaml

View File

@@ -4,11 +4,15 @@ services:
build:
context: ../
dockerfile: Dockerfile
command:
- /mqtt2prometheus
- -log-level
- debug
ports:
- 9641:9641
volumes:
- type: bind
source: ./mqtt2prometheus.yaml
source: ./${CONFIG:-dht22.yaml}
target: /config.yaml
mosquitto:
image: eclipse-mosquitto:1.6.9

1
hack/shelly.env Normal file
View File

@@ -0,0 +1 @@
CONFIG=shelly.yaml

21
hack/shelly.yaml Normal file
View File

@@ -0,0 +1,21 @@
mqtt:
server: tcp://mosquitto:1883
topic_path: shellies/+/sensor/+
device_id_regex: "shellies/(?P<deviceid>.*)/sensor"
metric_per_topic_config:
metric_name_regex: "shellies/(?P<deviceid>.*)/sensor/(?P<metricname>.*)"
qos: 0
cache:
timeout: 24h
metrics:
- prom_name: temperature
# The name of the metric in a MQTT JSON message
mqtt_name: temperature
# The prometheus help text for this metric
help: shelly temperature reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: shelly
# The name of the metric in prometheus

View File

@@ -10,10 +10,13 @@ import (
"gopkg.in/yaml.v2"
)
const GaugeValueType = "gauge"
const CounterValueType = "counter"
const (
GaugeValueType = "gauge"
CounterValueType = "counter"
const DeviceIDRegexGroup = "deviceid"
DeviceIDRegexGroup = "deviceid"
MetricNameRegexGroup = "metricname"
)
var MQTTConfigDefaults = MQTTConfig{
Server: "tcp://127.0.0.1:1883",
@@ -87,12 +90,24 @@ type CacheConfig struct {
}
type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
DeviceIDRegex *Regexp `yaml:"device_id_regex"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
ObjectPerTopicConfig *ObjectPerTopicConfig `yaml:"object_per_topic_config"`
MetricPerTopicConfig *MetricPerTopicConfig `yaml:"metric_per_topic_config"`
}
const EncodingJSON = "JSON"
type ObjectPerTopicConfig struct {
Encoding string `yaml:"encoding"` // Currently only JSON is a valid value
}
type MetricPerTopicConfig struct {
MetricNameRegex *Regexp `yaml:"metric_name_regex"` // Default
}
// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric
@@ -157,5 +172,23 @@ func LoadConfig(configFile string) (Config, error) {
if !validRegex {
return Config{}, fmt.Errorf("device id regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, DeviceIDRegexGroup)
}
if cfg.MQTT.ObjectPerTopicConfig == nil && cfg.MQTT.MetricPerTopicConfig == nil {
cfg.MQTT.ObjectPerTopicConfig = &ObjectPerTopicConfig{
Encoding: EncodingJSON,
}
}
if cfg.MQTT.MetricPerTopicConfig != nil {
validRegex = false
for _, name := range cfg.MQTT.MetricPerTopicConfig.MetricNameRegex.RegEx().SubexpNames() {
if name == MetricNameRegexGroup {
validRegex = true
}
}
if !validRegex {
return Config{}, fmt.Errorf("metric name regex %q does not contain required regex group %q", cfg.MQTT.DeviceIDRegex.pattern, MetricNameRegexGroup)
}
}
return cfg, nil
}

View File

@@ -1,6 +1,7 @@
package metrics
import (
"go.uber.org/zap"
"time"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
@@ -18,6 +19,7 @@ type Collector interface {
type MemoryCachedCollector struct {
cache *gocache.Cache
descriptions []*prometheus.Desc
logger *zap.Logger
}
type Metric struct {
@@ -30,7 +32,7 @@ type Metric struct {
type MetricCollection []Metric
func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig) Collector {
func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig, logger *zap.Logger) Collector {
var descs []*prometheus.Desc
for _, m := range possibleMetrics {
descs = append(descs, m.PrometheusDescription())
@@ -38,10 +40,14 @@ func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricC
return &MemoryCachedCollector{
cache: gocache.New(defaultTimeout, defaultTimeout*10),
descriptions: descs,
logger: logger,
}
}
func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) {
if len(collection) < 1 {
return
}
c.cache.Set(deviceID, collection, DefaultTimeout)
}
@@ -55,6 +61,9 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
for device, metricsRaw := range c.cache.Items() {
metrics := metricsRaw.Object.(MetricCollection)
for _, metric := range metrics {
if metric.Description == nil {
c.logger.Warn("empty description", zap.String("topic", metric.Topic), zap.Float64("value", metric.Value))
}
m := prometheus.MustNewConstMetric(
metric.Description,
metric.ValueType,

50
pkg/metrics/extractor.go Normal file
View File

@@ -0,0 +1,50 @@
package metrics
import (
"errors"
"fmt"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
gojsonq "github.com/thedevsaddam/gojsonq/v2"
)
type Extractor func(topic string, payload []byte, deviceID string) (MetricCollection, error)
func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
var mc MetricCollection
parsed := gojsonq.New().FromString(string(payload))
for path := range p.config() {
rawValue := parsed.Find(path)
parsed.Reset()
if rawValue == nil {
continue
}
m, err := p.parseMetric(path, deviceID, rawValue)
if err != nil {
return nil, fmt.Errorf("failed to parse valid metric value: %w", err)
}
m.Topic = topic
mc = append(mc, m)
}
return mc, nil
}
}
func NewMetricPerTopicExtractor(p Parser, metricName *config.Regexp) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
mName := metricName.GroupValue(topic, config.MetricNameRegexGroup)
if mName == "" {
return nil, fmt.Errorf("failed to find valid metric in topic path")
}
m, err := p.parseMetric(mName, deviceID, string(payload))
if err != nil {
if errors.Is(err, metricNotConfigured) {
return nil, nil
}
return nil, fmt.Errorf("failed to parse metric: %w", err)
}
m.Topic = topic
return MetricCollection{m}, nil
}
}

View File

@@ -3,140 +3,50 @@ package metrics
import (
"fmt"
"go.uber.org/zap"
"strconv"
"time"
gojsonq "github.com/thedevsaddam/gojsonq/v2"
"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/prometheus/client_golang/prometheus"
)
type Ingest struct {
metricConfigs map[string][]config.MetricConfig
instrumentation
extractor Extractor
deviceIDRegex *config.Regexp
collector Collector
MessageMetric *prometheus.CounterVec
logger *zap.Logger
}
func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex *config.Regexp) *Ingest {
cfgs := make(map[string][]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
cfgs[key] = append(cfgs[key], metrics[i])
}
return &Ingest{
metricConfigs: cfgs,
deviceIDRegex: deviceIDRegex,
collector: collector,
MessageMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "received_messages",
Help: "received messages per topic and status",
}, []string{"status", "topic"},
),
logger: config.ProcessContext.Logger(),
}
}
func NewIngest(collector Collector, extractor Extractor, deviceIDRegex *config.Regexp) *Ingest {
// validMetric returns config matching the metric and deviceID
// Second return value indicates if config was found.
func (i *Ingest) validMetric(metric string, deviceID string) (config.MetricConfig, bool) {
for _, c := range i.metricConfigs[metric] {
if c.SensorNameFilter.Match(deviceID) {
return c, true
}
return &Ingest{
instrumentation: defaultInstrumentation,
extractor: extractor,
deviceIDRegex: deviceIDRegex,
collector: collector,
logger: config.ProcessContext.Logger(),
}
return config.MetricConfig{}, false
}
func (i *Ingest) store(topic string, payload []byte) error {
var mc MetricCollection
deviceID := i.deviceID(topic)
parsed := gojsonq.New().FromString(string(payload))
for path := range i.metricConfigs {
rawValue := parsed.Find(path)
parsed.Reset()
if rawValue == nil {
continue
}
m, err := i.parseMetric(path, deviceID, rawValue)
if err != nil {
return fmt.Errorf("failed to parse valid metric value: %w", err)
}
m.Topic = topic
mc = append(mc, m)
mc, err := i.extractor(topic, payload, deviceID)
if err != nil {
return fmt.Errorf("failed to extract metric values from topic: %w", err)
}
i.collector.Observe(deviceID, mc)
return nil
}
func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface{}) (Metric, error) {
cfg, cfgFound := i.validMetric(metricPath, deviceID)
if !cfgFound {
return Metric{}, nil
}
var metricValue float64
if boolValue, ok := value.(bool); ok {
if boolValue {
metricValue = 1
} else {
metricValue = 0
}
} else if strValue, ok := value.(string); ok {
// If string value mapping is defined, use that
if cfg.StringValueMapping != nil {
floatValue, ok := cfg.StringValueMapping.Map[strValue]
if ok {
metricValue = floatValue
} else if cfg.StringValueMapping.ErrorValue != nil {
metricValue = *cfg.StringValueMapping.ErrorValue
} else {
return Metric{}, fmt.Errorf("got unexpected string data '%s'", strValue)
}
} else {
// otherwise try to parse float
floatValue, err := strconv.ParseFloat(strValue, 64)
if err != nil {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value)
}
metricValue = floatValue
}
} else if floatValue, ok := value.(float64); ok {
metricValue = floatValue
} else {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)
}
return Metric{
Description: cfg.PrometheusDescription(),
Value: metricValue,
ValueType: cfg.PrometheusValueType(),
IngestTime: time.Now(),
}, nil
}
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
return func(c mqtt.Client, m mqtt.Message) {
i.logger.Debug("Got message", zap.String("topic", m.Topic()), zap.String("payload", string(m.Payload())))
err := i.store(m.Topic(), m.Payload())
if err != nil {
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc()
i.CountStoreError(m.Topic())
return
}
i.MessageMetric.WithLabelValues("success", m.Topic()).Inc()
i.CountSuccess(m.Topic())
}
}

View File

@@ -0,0 +1,33 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
const (
storeError = "storeError"
success = "success"
)
var defaultInstrumentation = instrumentation{
messageMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "received_messages",
Help: "received messages per topic and status",
}, []string{"status", "topic"},
),
}
type instrumentation struct {
messageMetric *prometheus.CounterVec
}
func (i *instrumentation) MessageMetric() *prometheus.CounterVec {
return i.messageMetric
}
func (i *instrumentation) CountSuccess(topic string) {
i.messageMetric.WithLabelValues(success, topic).Inc()
}
func (i *instrumentation) CountStoreError(topic string) {
i.messageMetric.WithLabelValues(storeError, topic).Inc()
}

96
pkg/metrics/parser.go Normal file
View File

@@ -0,0 +1,96 @@
package metrics
import (
"errors"
"fmt"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"strconv"
"time"
)
type metricNotConfiguredError error
var metricNotConfigured metricNotConfiguredError = errors.New("metric not configured failed to parse")
type Parser struct {
metricConfigs map[string][]config.MetricConfig
}
func NewParser(metrics []config.MetricConfig) Parser {
cfgs := make(map[string][]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
cfgs[key] = append(cfgs[key], metrics[i])
}
return Parser{
metricConfigs: cfgs,
}
}
// Config returns the underlying metrics config
func (p *Parser) config() map[string][]config.MetricConfig {
return p.metricConfigs
}
// validMetric returns config matching the metric and deviceID
// Second return value indicates if config was found.
func (p *Parser) validMetric(metric string, deviceID string) (config.MetricConfig, bool) {
for _, c := range p.metricConfigs[metric] {
if c.SensorNameFilter.Match(deviceID) {
return c, true
}
}
return config.MetricConfig{}, false
}
func (p *Parser) parseMetric(metricPath string, deviceID string, value interface{}) (Metric, error) {
cfg, cfgFound := p.validMetric(metricPath, deviceID)
if !cfgFound {
return Metric{}, metricNotConfigured
}
var metricValue float64
if boolValue, ok := value.(bool); ok {
if boolValue {
metricValue = 1
} else {
metricValue = 0
}
} else if strValue, ok := value.(string); ok {
// If string value mapping is defined, use that
if cfg.StringValueMapping != nil {
floatValue, ok := cfg.StringValueMapping.Map[strValue]
if ok {
metricValue = floatValue
} else if cfg.StringValueMapping.ErrorValue != nil {
metricValue = *cfg.StringValueMapping.ErrorValue
} else {
return Metric{}, fmt.Errorf("got unexpected string data '%s'", strValue)
}
} else {
// otherwise try to parse float
floatValue, err := strconv.ParseFloat(strValue, 64)
if err != nil {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s') and failed to parse to float", value, value)
}
metricValue = floatValue
}
} else if floatValue, ok := value.(float64); ok {
metricValue = floatValue
} else {
return Metric{}, fmt.Errorf("got data with unexpectd type: %T ('%s')", value, value)
}
return Metric{
Description: cfg.PrometheusDescription(),
Value: metricValue,
ValueType: cfg.PrometheusValueType(),
IngestTime: time.Now(),
}, nil
}