diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1470bf4 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +bin/ +.git +systemd/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d745efb --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +config.yaml +bin/ +vendor \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..508d656 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM golang:1.10 as builder + +COPY . /go/src/github.com/hikhvar/mqtt2prometheus +WORKDIR /go/src/github.com/hikhvar/mqtt2prometheus +RUN make static_build TARGET_FILE=/bin/mqtt2prometheus + +FROM scratch +COPY --from=builder /bin/mqtt2prometheus /mqtt2prometheus +CMD ["/mqtt2prometheus"] \ No newline at end of file diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..5c671d5 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,93 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + branch = "master" + name = "github.com/beorn7/perks" + packages = ["quantile"] + revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9" + +[[projects]] + name = "github.com/eclipse/paho.mqtt.golang" + packages = [ + ".", + "packets" + ] + revision = "aff15770515e3c57fc6109da73d42b0d46f7f483" + version = "v1.1.0" + +[[projects]] + name = "github.com/golang/protobuf" + packages = ["proto"] + revision = "925541529c1fa6821df4e44ce2723319eb2be768" + version = "v1.0.0" + +[[projects]] + name = "github.com/matttproud/golang_protobuf_extensions" + packages = ["pbutil"] + revision = "3247c84500bff8d9fb6d579d800f20b3e091582c" + version = "v1.0.0" + +[[projects]] + name = "github.com/patrickmn/go-cache" + packages = ["."] + revision = "a3647f8e31d79543b2d0f0ae2fe5c379d72cedc0" + version = "v2.1.0" + +[[projects]] + name = "github.com/prometheus/client_golang" + packages = [ + "prometheus", + "prometheus/promhttp" + ] + revision = "c5b7fccd204277076155f10851dad72b76a49317" + version = "v0.8.0" + +[[projects]] + branch = "master" + name = "github.com/prometheus/client_model" + packages = ["go"] + revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c" + +[[projects]] + branch = "master" + name = "github.com/prometheus/common" + packages = [ + "expfmt", + "internal/bitbucket.org/ww/goautoneg", + "model" + ] + revision = "e4aa40a9169a88835b849a6efb71e05dc04b88f0" + +[[projects]] + branch = "master" + name = "github.com/prometheus/procfs" + packages = [ + ".", + "internal/util", + "nfs", + "xfs" + ] + revision = "54d17b57dd7d4a3aa092476596b3f8a933bde349" + +[[projects]] + branch = "master" + name = "golang.org/x/net" + packages = [ + "proxy", + "websocket" + ] + revision = "24dd3780ca4f75fed9f321890729414a4b5d3f13" + +[[projects]] + name = "gopkg.in/yaml.v2" + packages = ["."] + revision = "7f97868eec74b32b0982dd158a51a446d1da7eb5" + version = "v2.1.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "bbe7d36dddf85170c7fa8504fb8d1c3dfd9cbcd1199debd977ec7a5b2cfa7930" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..ce1eae2 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,46 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + + +[[constraint]] + name = "github.com/eclipse/paho.mqtt.golang" + version = "1.1.0" + +[[constraint]] + name = "github.com/patrickmn/go-cache" + version = "2.1.0" + +[[constraint]] + name = "github.com/prometheus/client_golang" + version = "0.8.0" + +[[constraint]] + name = "gopkg.in/yaml.v2" + version = "2.1.1" + +[prune] + go-tests = true + unused-packages = true diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..66ee9bc --- /dev/null +++ b/Makefile @@ -0,0 +1,42 @@ + +ifndef GOARCH + GOARCH:=$(shell go env GOARCH) +endif + +ifndef GOOS + GOOS:=$(shell go env GOOS) +endif + +ifndef TARGET_FILE + TARGET_FILE:=bin/mqtt2prometheus.$(GOOS)_$(GOARCH) +endif + +install-dep: + @which dep || go get -u github.com/golang/dep/cmd/dep + +Gopkg.lock: | install-dep + dep ensure --no-vendor + +Gopkg.toml: | install-dep + dep init + +prepare-vendor: Gopkg.toml Gopkg.lock + dep ensure -update --no-vendor + dep status + @echo "You can apply these locks via 'make apply-vendor-lock' or rollback via 'git checkout -- Gopkg.lock'" + +vendor: Gopkg.toml Gopkg.lock + dep ensure -vendor-only + dep status + +test: + go test ./... + +build: vendor + GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(TARGET_FILE) ./cmd + +static_build: vendor + CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(TARGET_FILE) -a -tags netgo -ldflags '-w -extldflags "-static"' ./cmd + +container: + docker build -t mqtt2prometheus:latest . diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..34c214a --- /dev/null +++ b/Readme.md @@ -0,0 +1,119 @@ +# MQTT2Prometheus +This exporter is an analog to the [Prometheus Pushgateway](https://github.com/prometheus/pushgateway). Clients can push +metrics via MQTT to an MQTT Broker. This exporter subscribes to the broker and +publish the received messages as prometheus metrics. I wrote this exporter to publish +metrics from small embedded sensors based on the NodeMCU to prometheus. + +## Assumptions about Messages and Topics +This exporter makes some assumptions about the message format and MQTT topics. This exporter assumes that each +client publish the metrics into a dedicated topic. The last level topic becomes the `sensor` label in prometheus. +This exporter assume that the message are JSON objects with only float fields. The golang type for the messages is: + +```go +type MQTTPayload map[string]float64 +``` + +For example the message + +```json +"temperature":23.20,"humidity":51.60,"heat_index":22.92} +``` + +published to the MQTT topic `devices/me/livingroom` becomes the following prometheus metrics: + +```text +temperature{sensor="livingroom"} 23.2 +heat_index{sensor="livingroom"} 22.92 +humidity{sensor="livingroom"} 51.6 +``` + +## Build + +To build the exporter run: + +```bash +make build +``` + +### Docker + +To build a docker container with the mqtt2prometheus exporter included run: + +```bash +make container +``` + +To run the container with a given config file: + +```bash +docker run -it -v "$(pwd)/config.yaml:/config.yaml" -p 8002:8002 mqtt2prometheus:latest +``` + +## Configuration +The exporter can be configured via command line and config file. + +### Commandline +Available command line flags: + +```text +Usage of ./mqtt2prometheus.linux_amd64: + -config string + config file (default "config.yaml") + -listen-address string + listen address for HTTP server used to expose metrics + -listen-port string + HTTP port used to expose metrics (default "8002") + +``` + +### Config file +The config file can look like this: + +```yaml +# Settings for the MQTT Client. Currently only these three are supported +mqtt: + # The MQTT broker to connect to + server: tcp://127.0.0.1:1883 + # The Topic path to subscripe to. Actually this will become `$topic_path/+` + topic_path: v1/devices/me + # The MQTT QoS level + qos: 0 +cache: + # Timeout. Each received metric will be presented for this time if no update is send via MQTT + timeout: 2min +# This is a list of valid metrics. Only metrics listed here will be exported +metrics: + # The name of the metric in prometheus + - prom_name: temperature + # The name of the metric in a MQTT JSON message + mqtt_name: temperature + # The prometheus help text for this metric + help: DHT22 temperature reading + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22 + # The name of the metric in prometheus + - prom_name: humidity + # The name of the metric in a MQTT JSON message + mqtt_name: humidity + # The prometheus help text for this metric + help: DHT22 humidity reading + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22 + # The name of the metric in prometheus + - prom_name: heat_index + # The name of the metric in a MQTT JSON message + mqtt_name: heat_index + # The prometheus help text for this metric + help: DHT22 heatIndex calculation + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22% +``` \ No newline at end of file diff --git a/cmd/mqtt2prometheus.go b/cmd/mqtt2prometheus.go new file mode 100644 index 0000000..3a953bf --- /dev/null +++ b/cmd/mqtt2prometheus.go @@ -0,0 +1,89 @@ +package main + +import ( + "log" + "net/http" + + "os" + + "time" + + "flag" + + "github.com/eclipse/paho.mqtt.golang" + "github.com/hikhvar/mqtt2prometheus/pkg/metrics" + "github.com/hikhvar/mqtt2prometheus/pkg/mqttclient" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + "fmt" + + "github.com/hikhvar/mqtt2prometheus/pkg/config" +) + +var ( + configFlag = flag.String( + "config", + "config.yaml", + "config file", + ) + portFlag = flag.String( + "listen-port", + "8002", + "HTTP port used to expose metrics", + ) + addressFlag = flag.String( + "listen-address", + "0.0.0.0", + "listen address for HTTP server used to expose metrics", + ) +) + +func main() { + flag.Parse() + c := make(chan os.Signal, 1) + hostName, err := os.Hostname() + if err != nil { + log.Fatalf("Could not get hostname. %s\n", err.Error()) + } + cfg, err := config.LoadConfig(*configFlag) + if err != nil { + log.Fatalf("Could not load config: %s\n", err.Error()) + } + mqttClientOptions := mqtt.NewClientOptions() + mqttClientOptions.AddBroker(cfg.MQTT.Server).SetClientID(hostName).SetCleanSession(true) + collector := metrics.NewCollector(2*time.Minute, cfg.Metrics) + ingest := metrics.NewIngest(collector, cfg.Metrics) + + var errorChan chan error + err = mqttclient.Subscribe(mqttClientOptions, mqttclient.SubscribeOptions{ + Topic: cfg.MQTT.TopicPath + "/+", + QoS: cfg.MQTT.QoS, + OnMessageReceived: ingest.SetupSubscriptionHandler(errorChan), + }) + if err != nil { + log.Fatalf("Could not connect to mqtt broker %s", err.Error()) + } + prometheus.MustRegister(ingest.MessageMetric) + prometheus.MustRegister(collector) + http.Handle("/metrics", promhttp.Handler()) + go func() { + err = http.ListenAndServe(getListenAddress(), nil) + if err != nil { + log.Fatalf("Error while serving http: %s", err.Error()) + } + }() + for { + select { + case <-c: + log.Println("Terminated via Signal. Stop.") + os.Exit(0) + case err = <-errorChan: + log.Printf("Error while processing message. %s", err.Error()) + } + } +} + +func getListenAddress() string { + return fmt.Sprintf("%s:%s", *addressFlag, *portFlag) +} diff --git a/config.yaml.dist b/config.yaml.dist new file mode 100644 index 0000000..dec7192 --- /dev/null +++ b/config.yaml.dist @@ -0,0 +1,46 @@ +# Settings for the MQTT Client. Currently only these three are supported +mqtt: + # The MQTT broker to connect to + server: tcp://127.0.0.1:1883 + # The Topic path to subscripe to. Actually this will become `$topic_path/+` + topic_path: v1/devices/me + # The MQTT QoS level + qos: 0 +cache: + # Timeout. Each received metric will be presented for this time if no update is send via MQTT + timeout: 2min +# This is a list of valid metrics. Only metrics listed here will be exported +metrics: + # The name of the metric in prometheus + - prom_name: temperature + # The name of the metric in a MQTT JSON message + mqtt_name: temperature + # The prometheus help text for this metric + help: DHT22 temperature reading + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22 + # The name of the metric in prometheus + - prom_name: humidity + # The name of the metric in a MQTT JSON message + mqtt_name: humidity + # The prometheus help text for this metric + help: DHT22 humidity reading + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22 + # The name of the metric in prometheus + - prom_name: heat_index + # The name of the metric in a MQTT JSON message + mqtt_name: heat_index + # The prometheus help text for this metric + help: DHT22 heatIndex calculation + # The prometheus type for this metric. Valid values are: "gauge" and "counter" + type: gauge + # A map of string to string for constant labels. This labels will be attached to every prometheus metric + const_labels: + sensor_type: dht22 \ No newline at end of file diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..c1995d4 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,83 @@ +package config + +import ( + "time" + + "io/ioutil" + + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" +) + +const GaugeValueType = "gauge" +const CounterValueType = "counter" + +var MQTTConfigDefaults = MQTTConfig{ + Server: "tcp://127.0.0.1:1883", + TopicPath: "v1/devices/me", + QoS: 0, +} + +var CacheConfigDefaults = CacheConfig{ + Timeout: 2 + time.Minute, +} + +type Config struct { + Metrics []MetricConfig `yaml:"metrics"` + MQTT *MQTTConfig `yaml:"mqtt,omitempty"` + Cache *CacheConfig `yaml:"timeout,omitempty"` +} + +type CacheConfig struct { + Timeout time.Duration `yaml:"timeout"` +} + +type MQTTConfig struct { + Server string `yaml:"server"` + TopicPath string `yaml:"topic_path"` + QoS byte `yaml:"qos"` +} + +// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric +type MetricConfig struct { + PrometheusName string `yaml:"prom_name"` + MQTTName string `yaml:"mqtt_name"` + Help string `yaml:"help"` + ValueType string `yaml:"type"` + ConstantLabels map[string]string `yaml:"const_labels"` +} + +func (mc *MetricConfig) PrometheusDescription() *prometheus.Desc { + return prometheus.NewDesc( + mc.PrometheusName, mc.Help, []string{"sensor"}, mc.ConstantLabels, + ) +} + +func (mc *MetricConfig) PrometheusValueType() prometheus.ValueType { + switch mc.ValueType { + case GaugeValueType: + return prometheus.GaugeValue + case CounterValueType: + return prometheus.CounterValue + default: + return prometheus.UntypedValue + } +} + +func LoadConfig(configFile string) (Config, error) { + configData, err := ioutil.ReadFile(configFile) + if err != nil { + return Config{}, err + } + var cfg Config + if err = yaml.Unmarshal(configData, &cfg); err != nil { + return cfg, err + } + if cfg.MQTT == nil { + cfg.MQTT = &MQTTConfigDefaults + } + if cfg.Cache == nil { + cfg.Cache = &CacheConfigDefaults + } + return cfg, nil +} diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go new file mode 100644 index 0000000..2860805 --- /dev/null +++ b/pkg/metrics/collector.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "time" + + "github.com/hikhvar/mqtt2prometheus/pkg/config" + gocache "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" +) + +const DefaultTimeout = 0 + +type Collector interface { + prometheus.Collector + Observe(deviceID string, collection MetricCollection) +} + +type MemoryCachedCollector struct { + cache *gocache.Cache + descriptions []*prometheus.Desc +} + +type Metric struct { + Description *prometheus.Desc + Value float64 + ValueType prometheus.ValueType +} + +type MetricCollection []Metric + +func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig) Collector { + var descs []*prometheus.Desc + for _, m := range possibleMetrics { + descs = append(descs, m.PrometheusDescription()) + } + return &MemoryCachedCollector{ + cache: gocache.New(defaultTimeout, defaultTimeout*10), + descriptions: descs, + } +} + +func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) { + c.cache.Set(deviceID, collection, DefaultTimeout) +} + +func (c *MemoryCachedCollector) Describe(ch chan<- *prometheus.Desc) { + for i := range c.descriptions { + ch <- c.descriptions[i] + } +} + +func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) { + for device, metricsRaw := range c.cache.Items() { + metrics := metricsRaw.Object.(MetricCollection) + for _, metric := range metrics { + m, err := prometheus.NewConstMetric( + metric.Description, + metric.ValueType, + metric.Value, + device, + ) + if err != nil { + panic(err) + } + mc <- m + } + } +} diff --git a/pkg/metrics/ingest.go b/pkg/metrics/ingest.go new file mode 100644 index 0000000..555556b --- /dev/null +++ b/pkg/metrics/ingest.go @@ -0,0 +1,80 @@ +package metrics + +import ( + "errors" + + "path/filepath" + + "encoding/json" + + "fmt" + + "log" + + "github.com/eclipse/paho.mqtt.golang" + "github.com/hikhvar/mqtt2prometheus/pkg/config" + "github.com/prometheus/client_golang/prometheus" +) + +var NoValidPayload = errors.New("no valid MQTT payload") + +type Ingest struct { + validMetrics map[string]config.MetricConfig + collector Collector + MessageMetric *prometheus.CounterVec +} + +func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest { + valid := make(map[string]config.MetricConfig) + for i := range metrics { + key := metrics[i].MQTTName + valid[key] = metrics[i] + } + return &Ingest{ + validMetrics: valid, + collector: collector, + MessageMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "received_messages", + Help: "received messages per topic and status", + }, []string{"status", "topic"}, + ), + } +} + +type MQTTPayload map[string]float64 + +func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error { + var mc MetricCollection + for metricName, value := range rawMetrics { + if cfg, found := i.validMetrics[metricName]; found { + mc = append(mc, Metric{ + Description: cfg.PrometheusDescription(), + Value: value, + ValueType: cfg.PrometheusValueType(), + }) + } + } + i.collector.Observe(deviceID, mc) + return nil +} + +func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler { + return func(c mqtt.Client, m mqtt.Message) { + log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic()) + deviceId := filepath.Base(m.Topic()) + var rawMetrics MQTTPayload + err := json.Unmarshal(m.Payload(), &rawMetrics) + if err != nil { + errChan <- fmt.Errorf("could not decode message '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error()) + i.MessageMetric.WithLabelValues("decodeError", m.Topic()).Desc() + } + err = i.store(deviceId, rawMetrics) + if err != nil { + errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error()) + i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc() + } + i.MessageMetric.WithLabelValues("success", m.Topic()).Inc() + } + +} diff --git a/pkg/mqttclient/mqttClient.go b/pkg/mqttclient/mqttClient.go new file mode 100644 index 0000000..bd79cea --- /dev/null +++ b/pkg/mqttclient/mqttClient.go @@ -0,0 +1,29 @@ +package mqttclient + +import ( + "log" + + "github.com/eclipse/paho.mqtt.golang" +) + +type SubscribeOptions struct { + Topic string + QoS byte + OnMessageReceived mqtt.MessageHandler +} + +func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error { + connectionOptions.OnConnect = func(client mqtt.Client) { + log.Print("Connected to MQTT Broker.\n") + log.Printf("Will subscribe to topic %s", subscribeOptions.Topic) + if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil { + log.Printf("Could not subscribe %s\n", token.Error().Error()) + } + } + client := mqtt.NewClient(connectionOptions) + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + return nil +} diff --git a/systemd/mqtt2prometheus.service b/systemd/mqtt2prometheus.service new file mode 100644 index 0000000..e9c8394 --- /dev/null +++ b/systemd/mqtt2prometheus.service @@ -0,0 +1,9 @@ +[Unit] +Description=Simple translator from mqtt messages to prometheus. Analog to pushgateway +Before=prometheus.service + +[Service] +ExecStart=/opt/mqtt2prometheus/mqtt2prometheus -config /etc/mqtt2prometheus/config.yaml -port 8002 + +[Install] +WantedBy=multi-user.target