mirror of
https://github.com/hikhvar/mqtt2prometheus.git
synced 2026-05-15 04:56:52 +00:00
Merge pull request #10 from oxplot/sensor-name-filtering
Sensor name filtering
This commit is contained in:
@@ -1,17 +1,19 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"flag"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/hikhvar/mqtt2prometheus/pkg/metrics"
|
||||
"github.com/hikhvar/mqtt2prometheus/pkg/mqttclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"fmt"
|
||||
|
||||
"github.com/hikhvar/mqtt2prometheus/pkg/config"
|
||||
"github.com/hikhvar/mqtt2prometheus/pkg/metrics"
|
||||
"github.com/hikhvar/mqtt2prometheus/pkg/mqttclient"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -51,7 +53,7 @@ func main() {
|
||||
collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics)
|
||||
ingest := metrics.NewIngest(collector, cfg.Metrics)
|
||||
|
||||
errorChan := make(chan error,1)
|
||||
errorChan := make(chan error, 1)
|
||||
|
||||
err = mqttclient.Subscribe(mqttClientOptions, mqttclient.SubscribeOptions{
|
||||
Topic: cfg.MQTT.TopicPath + "/+",
|
||||
|
||||
@@ -47,9 +47,12 @@ metrics:
|
||||
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
|
||||
const_labels:
|
||||
sensor_type: dht22
|
||||
# The name of the metric in prometheus
|
||||
- prom_name: state
|
||||
# The name of the metric in a MQTT JSON message
|
||||
mqtt_name: state
|
||||
# Regular expression to only match sensors with the given name pattern
|
||||
sensor_name_filter: "^.*-light$"
|
||||
# The prometheus help text for this metric
|
||||
help: Light state
|
||||
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
|
||||
|
||||
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@@ -21,6 +22,33 @@ var CacheConfigDefaults = CacheConfig{
|
||||
Timeout: 2 * time.Minute,
|
||||
}
|
||||
|
||||
type RegexpFilter struct {
|
||||
r *regexp.Regexp
|
||||
pattern string
|
||||
}
|
||||
|
||||
func (rf *RegexpFilter) UnmarshalYAML(unmarshal func(interface{}) error) error {
|
||||
var pattern string
|
||||
if err := unmarshal(&pattern); err != nil {
|
||||
return err
|
||||
}
|
||||
r, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rf.r = r
|
||||
rf.pattern = pattern
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rf *RegexpFilter) MarshalYAML() (interface{}, error) {
|
||||
return rf.pattern, nil
|
||||
}
|
||||
|
||||
func (rf *RegexpFilter) Match(s string) bool {
|
||||
return rf.r == nil || rf.r.MatchString(s)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Metrics []MetricConfig `yaml:"metrics"`
|
||||
MQTT *MQTTConfig `yaml:"mqtt,omitempty"`
|
||||
@@ -43,6 +71,7 @@ type MQTTConfig struct {
|
||||
type MetricConfig struct {
|
||||
PrometheusName string `yaml:"prom_name"`
|
||||
MQTTName string `yaml:"mqtt_name"`
|
||||
SensorNameFilter RegexpFilter `yaml:"sensor_name_filter"`
|
||||
Help string `yaml:"help"`
|
||||
ValueType string `yaml:"type"`
|
||||
ConstantLabels map[string]string `yaml:"const_labels"`
|
||||
|
||||
@@ -13,20 +13,20 @@ import (
|
||||
)
|
||||
|
||||
type Ingest struct {
|
||||
validMetrics map[string]config.MetricConfig
|
||||
metricConfigs map[string][]config.MetricConfig
|
||||
collector Collector
|
||||
MessageMetric *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest {
|
||||
valid := make(map[string]config.MetricConfig)
|
||||
cfgs := make(map[string][]config.MetricConfig)
|
||||
for i := range metrics {
|
||||
key := metrics[i].MQTTName
|
||||
valid[key] = metrics[i]
|
||||
cfgs[key] = append(cfgs[key], metrics[i])
|
||||
}
|
||||
return &Ingest{
|
||||
validMetrics: valid,
|
||||
collector: collector,
|
||||
metricConfigs: cfgs,
|
||||
collector: collector,
|
||||
MessageMetric: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "received_messages",
|
||||
@@ -36,13 +36,24 @@ func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest {
|
||||
}
|
||||
}
|
||||
|
||||
// validMetric returns config matching the metric and deviceID
|
||||
// Second return value indicates if config was found.
|
||||
func (i *Ingest) validMetric(metric string, deviceID string) (config.MetricConfig, bool) {
|
||||
for _, c := range i.metricConfigs[metric] {
|
||||
if c.SensorNameFilter.Match(deviceID) {
|
||||
return c, true
|
||||
}
|
||||
}
|
||||
return config.MetricConfig{}, false
|
||||
}
|
||||
|
||||
type MQTTPayload map[string]interface{}
|
||||
|
||||
func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error {
|
||||
var mc MetricCollection
|
||||
|
||||
for metricName, value := range rawMetrics {
|
||||
cfg, cfgFound := i.validMetrics[metricName]
|
||||
cfg, cfgFound := i.validMetric(metricName, deviceID)
|
||||
if !cfgFound {
|
||||
continue
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user