Initial commit

This commit is contained in:
Christoph Petrausch
2018-03-18 13:20:49 +01:00
parent d7c6eb631d
commit 7d41c58d70
14 changed files with 719 additions and 0 deletions

3
.dockerignore Normal file
View File

@@ -0,0 +1,3 @@
bin/
.git
systemd/

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
config.yaml
bin/
vendor

9
Dockerfile Normal file
View File

@@ -0,0 +1,9 @@
FROM golang:1.10 as builder
COPY . /go/src/github.com/hikhvar/mqtt2prometheus
WORKDIR /go/src/github.com/hikhvar/mqtt2prometheus
RUN make static_build TARGET_FILE=/bin/mqtt2prometheus
FROM scratch
COPY --from=builder /bin/mqtt2prometheus /mqtt2prometheus
CMD ["/mqtt2prometheus"]

93
Gopkg.lock generated Normal file
View File

@@ -0,0 +1,93 @@
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
branch = "master"
name = "github.com/beorn7/perks"
packages = ["quantile"]
revision = "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9"
[[projects]]
name = "github.com/eclipse/paho.mqtt.golang"
packages = [
".",
"packets"
]
revision = "aff15770515e3c57fc6109da73d42b0d46f7f483"
version = "v1.1.0"
[[projects]]
name = "github.com/golang/protobuf"
packages = ["proto"]
revision = "925541529c1fa6821df4e44ce2723319eb2be768"
version = "v1.0.0"
[[projects]]
name = "github.com/matttproud/golang_protobuf_extensions"
packages = ["pbutil"]
revision = "3247c84500bff8d9fb6d579d800f20b3e091582c"
version = "v1.0.0"
[[projects]]
name = "github.com/patrickmn/go-cache"
packages = ["."]
revision = "a3647f8e31d79543b2d0f0ae2fe5c379d72cedc0"
version = "v2.1.0"
[[projects]]
name = "github.com/prometheus/client_golang"
packages = [
"prometheus",
"prometheus/promhttp"
]
revision = "c5b7fccd204277076155f10851dad72b76a49317"
version = "v0.8.0"
[[projects]]
branch = "master"
name = "github.com/prometheus/client_model"
packages = ["go"]
revision = "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c"
[[projects]]
branch = "master"
name = "github.com/prometheus/common"
packages = [
"expfmt",
"internal/bitbucket.org/ww/goautoneg",
"model"
]
revision = "e4aa40a9169a88835b849a6efb71e05dc04b88f0"
[[projects]]
branch = "master"
name = "github.com/prometheus/procfs"
packages = [
".",
"internal/util",
"nfs",
"xfs"
]
revision = "54d17b57dd7d4a3aa092476596b3f8a933bde349"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = [
"proxy",
"websocket"
]
revision = "24dd3780ca4f75fed9f321890729414a4b5d3f13"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "7f97868eec74b32b0982dd158a51a446d1da7eb5"
version = "v2.1.1"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "bbe7d36dddf85170c7fa8504fb8d1c3dfd9cbcd1199debd977ec7a5b2cfa7930"
solver-name = "gps-cdcl"
solver-version = 1

46
Gopkg.toml Normal file
View File

@@ -0,0 +1,46 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true
[[constraint]]
name = "github.com/eclipse/paho.mqtt.golang"
version = "1.1.0"
[[constraint]]
name = "github.com/patrickmn/go-cache"
version = "2.1.0"
[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.8.0"
[[constraint]]
name = "gopkg.in/yaml.v2"
version = "2.1.1"
[prune]
go-tests = true
unused-packages = true

42
Makefile Normal file
View File

@@ -0,0 +1,42 @@
ifndef GOARCH
GOARCH:=$(shell go env GOARCH)
endif
ifndef GOOS
GOOS:=$(shell go env GOOS)
endif
ifndef TARGET_FILE
TARGET_FILE:=bin/mqtt2prometheus.$(GOOS)_$(GOARCH)
endif
install-dep:
@which dep || go get -u github.com/golang/dep/cmd/dep
Gopkg.lock: | install-dep
dep ensure --no-vendor
Gopkg.toml: | install-dep
dep init
prepare-vendor: Gopkg.toml Gopkg.lock
dep ensure -update --no-vendor
dep status
@echo "You can apply these locks via 'make apply-vendor-lock' or rollback via 'git checkout -- Gopkg.lock'"
vendor: Gopkg.toml Gopkg.lock
dep ensure -vendor-only
dep status
test:
go test ./...
build: vendor
GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(TARGET_FILE) ./cmd
static_build: vendor
CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o $(TARGET_FILE) -a -tags netgo -ldflags '-w -extldflags "-static"' ./cmd
container:
docker build -t mqtt2prometheus:latest .

119
Readme.md Normal file
View File

@@ -0,0 +1,119 @@
# MQTT2Prometheus
This exporter is an analog to the [Prometheus Pushgateway](https://github.com/prometheus/pushgateway). Clients can push
metrics via MQTT to an MQTT Broker. This exporter subscribes to the broker and
publish the received messages as prometheus metrics. I wrote this exporter to publish
metrics from small embedded sensors based on the NodeMCU to prometheus.
## Assumptions about Messages and Topics
This exporter makes some assumptions about the message format and MQTT topics. This exporter assumes that each
client publish the metrics into a dedicated topic. The last level topic becomes the `sensor` label in prometheus.
This exporter assume that the message are JSON objects with only float fields. The golang type for the messages is:
```go
type MQTTPayload map[string]float64
```
For example the message
```json
"temperature":23.20,"humidity":51.60,"heat_index":22.92}
```
published to the MQTT topic `devices/me/livingroom` becomes the following prometheus metrics:
```text
temperature{sensor="livingroom"} 23.2
heat_index{sensor="livingroom"} 22.92
humidity{sensor="livingroom"} 51.6
```
## Build
To build the exporter run:
```bash
make build
```
### Docker
To build a docker container with the mqtt2prometheus exporter included run:
```bash
make container
```
To run the container with a given config file:
```bash
docker run -it -v "$(pwd)/config.yaml:/config.yaml" -p 8002:8002 mqtt2prometheus:latest
```
## Configuration
The exporter can be configured via command line and config file.
### Commandline
Available command line flags:
```text
Usage of ./mqtt2prometheus.linux_amd64:
-config string
config file (default "config.yaml")
-listen-address string
listen address for HTTP server used to expose metrics
-listen-port string
HTTP port used to expose metrics (default "8002")
```
### Config file
The config file can look like this:
```yaml
# Settings for the MQTT Client. Currently only these three are supported
mqtt:
# The MQTT broker to connect to
server: tcp://127.0.0.1:1883
# The Topic path to subscripe to. Actually this will become `$topic_path/+`
topic_path: v1/devices/me
# The MQTT QoS level
qos: 0
cache:
# Timeout. Each received metric will be presented for this time if no update is send via MQTT
timeout: 2min
# This is a list of valid metrics. Only metrics listed here will be exported
metrics:
# The name of the metric in prometheus
- prom_name: temperature
# The name of the metric in a MQTT JSON message
mqtt_name: temperature
# The prometheus help text for this metric
help: DHT22 temperature reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22
# The name of the metric in prometheus
- prom_name: humidity
# The name of the metric in a MQTT JSON message
mqtt_name: humidity
# The prometheus help text for this metric
help: DHT22 humidity reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22
# The name of the metric in prometheus
- prom_name: heat_index
# The name of the metric in a MQTT JSON message
mqtt_name: heat_index
# The prometheus help text for this metric
help: DHT22 heatIndex calculation
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22%
```

89
cmd/mqtt2prometheus.go Normal file
View File

@@ -0,0 +1,89 @@
package main
import (
"log"
"net/http"
"os"
"time"
"flag"
"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/metrics"
"github.com/hikhvar/mqtt2prometheus/pkg/mqttclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"fmt"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
)
var (
configFlag = flag.String(
"config",
"config.yaml",
"config file",
)
portFlag = flag.String(
"listen-port",
"8002",
"HTTP port used to expose metrics",
)
addressFlag = flag.String(
"listen-address",
"0.0.0.0",
"listen address for HTTP server used to expose metrics",
)
)
func main() {
flag.Parse()
c := make(chan os.Signal, 1)
hostName, err := os.Hostname()
if err != nil {
log.Fatalf("Could not get hostname. %s\n", err.Error())
}
cfg, err := config.LoadConfig(*configFlag)
if err != nil {
log.Fatalf("Could not load config: %s\n", err.Error())
}
mqttClientOptions := mqtt.NewClientOptions()
mqttClientOptions.AddBroker(cfg.MQTT.Server).SetClientID(hostName).SetCleanSession(true)
collector := metrics.NewCollector(2*time.Minute, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics)
var errorChan chan error
err = mqttclient.Subscribe(mqttClientOptions, mqttclient.SubscribeOptions{
Topic: cfg.MQTT.TopicPath + "/+",
QoS: cfg.MQTT.QoS,
OnMessageReceived: ingest.SetupSubscriptionHandler(errorChan),
})
if err != nil {
log.Fatalf("Could not connect to mqtt broker %s", err.Error())
}
prometheus.MustRegister(ingest.MessageMetric)
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
go func() {
err = http.ListenAndServe(getListenAddress(), nil)
if err != nil {
log.Fatalf("Error while serving http: %s", err.Error())
}
}()
for {
select {
case <-c:
log.Println("Terminated via Signal. Stop.")
os.Exit(0)
case err = <-errorChan:
log.Printf("Error while processing message. %s", err.Error())
}
}
}
func getListenAddress() string {
return fmt.Sprintf("%s:%s", *addressFlag, *portFlag)
}

46
config.yaml.dist Normal file
View File

@@ -0,0 +1,46 @@
# Settings for the MQTT Client. Currently only these three are supported
mqtt:
# The MQTT broker to connect to
server: tcp://127.0.0.1:1883
# The Topic path to subscripe to. Actually this will become `$topic_path/+`
topic_path: v1/devices/me
# The MQTT QoS level
qos: 0
cache:
# Timeout. Each received metric will be presented for this time if no update is send via MQTT
timeout: 2min
# This is a list of valid metrics. Only metrics listed here will be exported
metrics:
# The name of the metric in prometheus
- prom_name: temperature
# The name of the metric in a MQTT JSON message
mqtt_name: temperature
# The prometheus help text for this metric
help: DHT22 temperature reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22
# The name of the metric in prometheus
- prom_name: humidity
# The name of the metric in a MQTT JSON message
mqtt_name: humidity
# The prometheus help text for this metric
help: DHT22 humidity reading
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22
# The name of the metric in prometheus
- prom_name: heat_index
# The name of the metric in a MQTT JSON message
mqtt_name: heat_index
# The prometheus help text for this metric
help: DHT22 heatIndex calculation
# The prometheus type for this metric. Valid values are: "gauge" and "counter"
type: gauge
# A map of string to string for constant labels. This labels will be attached to every prometheus metric
const_labels:
sensor_type: dht22

83
pkg/config/config.go Normal file
View File

@@ -0,0 +1,83 @@
package config
import (
"time"
"io/ioutil"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)
const GaugeValueType = "gauge"
const CounterValueType = "counter"
var MQTTConfigDefaults = MQTTConfig{
Server: "tcp://127.0.0.1:1883",
TopicPath: "v1/devices/me",
QoS: 0,
}
var CacheConfigDefaults = CacheConfig{
Timeout: 2 + time.Minute,
}
type Config struct {
Metrics []MetricConfig `yaml:"metrics"`
MQTT *MQTTConfig `yaml:"mqtt,omitempty"`
Cache *CacheConfig `yaml:"timeout,omitempty"`
}
type CacheConfig struct {
Timeout time.Duration `yaml:"timeout"`
}
type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
QoS byte `yaml:"qos"`
}
// Metrics Config is a mapping between a metric send on mqtt to a prometheus metric
type MetricConfig struct {
PrometheusName string `yaml:"prom_name"`
MQTTName string `yaml:"mqtt_name"`
Help string `yaml:"help"`
ValueType string `yaml:"type"`
ConstantLabels map[string]string `yaml:"const_labels"`
}
func (mc *MetricConfig) PrometheusDescription() *prometheus.Desc {
return prometheus.NewDesc(
mc.PrometheusName, mc.Help, []string{"sensor"}, mc.ConstantLabels,
)
}
func (mc *MetricConfig) PrometheusValueType() prometheus.ValueType {
switch mc.ValueType {
case GaugeValueType:
return prometheus.GaugeValue
case CounterValueType:
return prometheus.CounterValue
default:
return prometheus.UntypedValue
}
}
func LoadConfig(configFile string) (Config, error) {
configData, err := ioutil.ReadFile(configFile)
if err != nil {
return Config{}, err
}
var cfg Config
if err = yaml.Unmarshal(configData, &cfg); err != nil {
return cfg, err
}
if cfg.MQTT == nil {
cfg.MQTT = &MQTTConfigDefaults
}
if cfg.Cache == nil {
cfg.Cache = &CacheConfigDefaults
}
return cfg, nil
}

68
pkg/metrics/collector.go Normal file
View File

@@ -0,0 +1,68 @@
package metrics
import (
"time"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
gocache "github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
)
const DefaultTimeout = 0
type Collector interface {
prometheus.Collector
Observe(deviceID string, collection MetricCollection)
}
type MemoryCachedCollector struct {
cache *gocache.Cache
descriptions []*prometheus.Desc
}
type Metric struct {
Description *prometheus.Desc
Value float64
ValueType prometheus.ValueType
}
type MetricCollection []Metric
func NewCollector(defaultTimeout time.Duration, possibleMetrics []config.MetricConfig) Collector {
var descs []*prometheus.Desc
for _, m := range possibleMetrics {
descs = append(descs, m.PrometheusDescription())
}
return &MemoryCachedCollector{
cache: gocache.New(defaultTimeout, defaultTimeout*10),
descriptions: descs,
}
}
func (c *MemoryCachedCollector) Observe(deviceID string, collection MetricCollection) {
c.cache.Set(deviceID, collection, DefaultTimeout)
}
func (c *MemoryCachedCollector) Describe(ch chan<- *prometheus.Desc) {
for i := range c.descriptions {
ch <- c.descriptions[i]
}
}
func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
for device, metricsRaw := range c.cache.Items() {
metrics := metricsRaw.Object.(MetricCollection)
for _, metric := range metrics {
m, err := prometheus.NewConstMetric(
metric.Description,
metric.ValueType,
metric.Value,
device,
)
if err != nil {
panic(err)
}
mc <- m
}
}
}

80
pkg/metrics/ingest.go Normal file
View File

@@ -0,0 +1,80 @@
package metrics
import (
"errors"
"path/filepath"
"encoding/json"
"fmt"
"log"
"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/prometheus/client_golang/prometheus"
)
var NoValidPayload = errors.New("no valid MQTT payload")
type Ingest struct {
validMetrics map[string]config.MetricConfig
collector Collector
MessageMetric *prometheus.CounterVec
}
func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest {
valid := make(map[string]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
valid[key] = metrics[i]
}
return &Ingest{
validMetrics: valid,
collector: collector,
MessageMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "received_messages",
Help: "received messages per topic and status",
}, []string{"status", "topic"},
),
}
}
type MQTTPayload map[string]float64
func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error {
var mc MetricCollection
for metricName, value := range rawMetrics {
if cfg, found := i.validMetrics[metricName]; found {
mc = append(mc, Metric{
Description: cfg.PrometheusDescription(),
Value: value,
ValueType: cfg.PrometheusValueType(),
})
}
}
i.collector.Observe(deviceID, mc)
return nil
}
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
return func(c mqtt.Client, m mqtt.Message) {
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
deviceId := filepath.Base(m.Topic())
var rawMetrics MQTTPayload
err := json.Unmarshal(m.Payload(), &rawMetrics)
if err != nil {
errChan <- fmt.Errorf("could not decode message '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("decodeError", m.Topic()).Desc()
}
err = i.store(deviceId, rawMetrics)
if err != nil {
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc()
}
i.MessageMetric.WithLabelValues("success", m.Topic()).Inc()
}
}

View File

@@ -0,0 +1,29 @@
package mqttclient
import (
"log"
"github.com/eclipse/paho.mqtt.golang"
)
type SubscribeOptions struct {
Topic string
QoS byte
OnMessageReceived mqtt.MessageHandler
}
func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error {
connectionOptions.OnConnect = func(client mqtt.Client) {
log.Print("Connected to MQTT Broker.\n")
log.Printf("Will subscribe to topic %s", subscribeOptions.Topic)
if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil {
log.Printf("Could not subscribe %s\n", token.Error().Error())
}
}
client := mqtt.NewClient(connectionOptions)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}

View File

@@ -0,0 +1,9 @@
[Unit]
Description=Simple translator from mqtt messages to prometheus. Analog to pushgateway
Before=prometheus.service
[Service]
ExecStart=/opt/mqtt2prometheus/mqtt2prometheus -config /etc/mqtt2prometheus/config.yaml -port 8002
[Install]
WantedBy=multi-user.target