Add mqtt2prometheus_connected gauge to represent connection state

This commit is contained in:
Christoph Petrausch
2021-03-12 21:22:27 +01:00
parent bad19409eb
commit 30a2180cfa
3 changed files with 38 additions and 6 deletions

View File

@@ -73,6 +73,7 @@ func main() {
}
mqttClientOptions := mqtt.NewClientOptions()
mqttClientOptions.AddBroker(cfg.MQTT.Server).SetCleanSession(true)
mqttClientOptions.SetAutoReconnect(true)
mqttClientOptions.SetUsername(cfg.MQTT.User)
mqttClientOptions.SetPassword(cfg.MQTT.Password)
@@ -96,6 +97,8 @@ func main() {
logger.Fatal("could not setup a metric extractor", zap.Error(err))
}
ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex)
mqttClientOptions.SetOnConnectHandler(ingest.OnConnectHandler)
mqttClientOptions.SetConnectionLostHandler(ingest.ConnectionLostHandler)
errorChan := make(chan error, 1)
for {
@@ -113,7 +116,7 @@ func main() {
time.Sleep(10 * time.Second)
}
prometheus.MustRegister(ingest.MessageMetric())
prometheus.MustRegister(ingest.Collector())
prometheus.MustRegister(collector)
http.Handle("/metrics", promhttp.Handler())
go func() {

View File

@@ -1,6 +1,9 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/prometheus/client_golang/prometheus"
)
const (
storeError = "storeError"
@@ -14,14 +17,30 @@ var defaultInstrumentation = instrumentation{
Help: "received messages per topic and status",
}, []string{"status", "topic"},
),
connectedMetric: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "mqtt2prometheus_connected",
Help: "is the mqtt2prometheus exporter connected to the broker",
},
),
}
type instrumentation struct {
messageMetric *prometheus.CounterVec
messageMetric *prometheus.CounterVec
connectedMetric prometheus.Gauge
}
func (i *instrumentation) MessageMetric() *prometheus.CounterVec {
return i.messageMetric
func (i *instrumentation) Collector() prometheus.Collector {
return i
}
func (i *instrumentation) Describe(desc chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(i, desc)
}
func (i *instrumentation) Collect(metrics chan<- prometheus.Metric) {
i.connectedMetric.Collect(metrics)
i.messageMetric.Collect(metrics)
}
func (i *instrumentation) CountSuccess(topic string) {
@@ -31,3 +50,11 @@ func (i *instrumentation) CountSuccess(topic string) {
func (i *instrumentation) CountStoreError(topic string) {
i.messageMetric.WithLabelValues(storeError, topic).Inc()
}
func (i *instrumentation) ConnectionLostHandler(client mqtt.Client, err error) {
i.connectedMetric.Set(0)
}
func (i *instrumentation) OnConnectHandler(client mqtt.Client) {
i.connectedMetric.Set(1)
}

View File

@@ -1,7 +1,7 @@
package mqttclient
import (
"github.com/eclipse/paho.mqtt.golang"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
)
@@ -13,8 +13,10 @@ type SubscribeOptions struct {
}
func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error {
oldConnect := connectionOptions.OnConnect
connectionOptions.OnConnect = func(client mqtt.Client) {
logger := subscribeOptions.Logger
oldConnect(client)
logger.Info("Connected to MQTT Broker")
logger.Info("Will subscribe to topic", zap.String("topic", subscribeOptions.Topic))
if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil {