diff --git a/cmd/mqtt2prometheus.go b/cmd/mqtt2prometheus.go index 920a821..9ca174f 100644 --- a/cmd/mqtt2prometheus.go +++ b/cmd/mqtt2prometheus.go @@ -73,6 +73,7 @@ func main() { } mqttClientOptions := mqtt.NewClientOptions() mqttClientOptions.AddBroker(cfg.MQTT.Server).SetCleanSession(true) + mqttClientOptions.SetAutoReconnect(true) mqttClientOptions.SetUsername(cfg.MQTT.User) mqttClientOptions.SetPassword(cfg.MQTT.Password) @@ -96,6 +97,8 @@ func main() { logger.Fatal("could not setup a metric extractor", zap.Error(err)) } ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex) + mqttClientOptions.SetOnConnectHandler(ingest.OnConnectHandler) + mqttClientOptions.SetConnectionLostHandler(ingest.ConnectionLostHandler) errorChan := make(chan error, 1) for { @@ -113,7 +116,7 @@ func main() { time.Sleep(10 * time.Second) } - prometheus.MustRegister(ingest.MessageMetric()) + prometheus.MustRegister(ingest.Collector()) prometheus.MustRegister(collector) http.Handle("/metrics", promhttp.Handler()) go func() { diff --git a/pkg/metrics/instrumentation.go b/pkg/metrics/instrumentation.go index 0082bdc..5dd75eb 100644 --- a/pkg/metrics/instrumentation.go +++ b/pkg/metrics/instrumentation.go @@ -1,6 +1,9 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/prometheus/client_golang/prometheus" +) const ( storeError = "storeError" @@ -14,14 +17,30 @@ var defaultInstrumentation = instrumentation{ Help: "received messages per topic and status", }, []string{"status", "topic"}, ), + connectedMetric: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "mqtt2prometheus_connected", + Help: "is the mqtt2prometheus exporter connected to the broker", + }, + ), } type instrumentation struct { - messageMetric *prometheus.CounterVec + messageMetric *prometheus.CounterVec + connectedMetric prometheus.Gauge } -func (i *instrumentation) MessageMetric() *prometheus.CounterVec { - return i.messageMetric +func (i *instrumentation) Collector() prometheus.Collector { + return i +} + +func (i *instrumentation) Describe(desc chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(i, desc) +} + +func (i *instrumentation) Collect(metrics chan<- prometheus.Metric) { + i.connectedMetric.Collect(metrics) + i.messageMetric.Collect(metrics) } func (i *instrumentation) CountSuccess(topic string) { @@ -31,3 +50,11 @@ func (i *instrumentation) CountSuccess(topic string) { func (i *instrumentation) CountStoreError(topic string) { i.messageMetric.WithLabelValues(storeError, topic).Inc() } + +func (i *instrumentation) ConnectionLostHandler(client mqtt.Client, err error) { + i.connectedMetric.Set(0) +} + +func (i *instrumentation) OnConnectHandler(client mqtt.Client) { + i.connectedMetric.Set(1) +} diff --git a/pkg/mqttclient/mqttClient.go b/pkg/mqttclient/mqttClient.go index 4ac181c..2e85a8d 100644 --- a/pkg/mqttclient/mqttClient.go +++ b/pkg/mqttclient/mqttClient.go @@ -1,7 +1,7 @@ package mqttclient import ( - "github.com/eclipse/paho.mqtt.golang" + mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" ) @@ -13,8 +13,10 @@ type SubscribeOptions struct { } func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error { + oldConnect := connectionOptions.OnConnect connectionOptions.OnConnect = func(client mqtt.Client) { logger := subscribeOptions.Logger + oldConnect(client) logger.Info("Connected to MQTT Broker") logger.Info("Will subscribe to topic", zap.String("topic", subscribeOptions.Topic)) if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil {