mirror of
https://github.com/hikhvar/mqtt2prometheus.git
synced 2026-02-14 09:59:52 +00:00
Use go.uber.org/zap for logging
As mentioned in https://github.com/hikhvar/mqtt2prometheus/issues/23, we do not use any logging framework at all. This was fine for getting the exporter startet. However, with inreasing load the logging must be configureable. This PR is a start to replace all instances of "log.Printf" with the zap logger. The current configuration parameters are the log level and the log format (console, json). We might expose the log configuration to the config file. But I think this is overkill for the current state of the exporter.
This commit is contained in:
@@ -4,7 +4,8 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
@@ -45,6 +46,12 @@ var (
|
||||
false,
|
||||
"show the builds version, date and commit",
|
||||
)
|
||||
logLevelFlag = zap.LevelFlag("log-level", zap.InfoLevel, "sets the default loglevel (default: \"info\")")
|
||||
logEncodingFlag = flag.String(
|
||||
"log-format",
|
||||
"console",
|
||||
"set the desired log output format. Valid values are 'console' and 'json'",
|
||||
)
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -53,14 +60,16 @@ func main() {
|
||||
mustShowVersion()
|
||||
os.Exit(0)
|
||||
}
|
||||
logger := mustSetupLogger()
|
||||
defer logger.Sync()
|
||||
c := make(chan os.Signal, 1)
|
||||
hostName, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Fatalf("Could not get hostname. %s\n", err.Error())
|
||||
logger.Fatal("Could not get hostname", zap.Error(err))
|
||||
}
|
||||
cfg, err := config.LoadConfig(*configFlag)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not load config: %s\n", err.Error())
|
||||
logger.Fatal("Could not load config", zap.Error(err))
|
||||
}
|
||||
mqttClientOptions := mqtt.NewClientOptions()
|
||||
mqttClientOptions.AddBroker(cfg.MQTT.Server).SetClientID(hostName).SetCleanSession(true)
|
||||
@@ -78,12 +87,13 @@ func main() {
|
||||
Topic: cfg.MQTT.TopicPath,
|
||||
QoS: cfg.MQTT.QoS,
|
||||
OnMessageReceived: ingest.SetupSubscriptionHandler(errorChan),
|
||||
Logger: logger,
|
||||
})
|
||||
if err == nil {
|
||||
// connected, break loop
|
||||
break
|
||||
}
|
||||
log.Printf("Could not connect to mqtt broker %s, sleep 10 second", err.Error())
|
||||
logger.Warn("could not connect to mqtt broker %s, sleep 10 second", zap.Error(err))
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
@@ -93,17 +103,17 @@ func main() {
|
||||
go func() {
|
||||
err = http.ListenAndServe(getListenAddress(), nil)
|
||||
if err != nil {
|
||||
log.Fatalf("Error while serving http: %s", err.Error())
|
||||
logger.Fatal("Error while serving http", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c:
|
||||
log.Println("Terminated via Signal. Stop.")
|
||||
logger.Info("Terminated via Signal. Stop.")
|
||||
os.Exit(0)
|
||||
case err = <-errorChan:
|
||||
log.Printf("Error while processing message. %s", err.Error())
|
||||
logger.Error("Error while processing message", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,3 +147,19 @@ func mustMQTTClientID() string {
|
||||
pid := os.Getpid()
|
||||
return fmt.Sprintf("%s-%d", host, pid)
|
||||
}
|
||||
|
||||
func mustSetupLogger() *zap.Logger {
|
||||
cfg := zap.NewProductionConfig()
|
||||
cfg.Level = zap.NewAtomicLevelAt(*logLevelFlag)
|
||||
cfg.Encoding = *logEncodingFlag
|
||||
if cfg.Encoding == "console" {
|
||||
cfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
|
||||
}
|
||||
logger, err := cfg.Build()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to build logger: %v", err))
|
||||
}
|
||||
|
||||
config.SetProcessContext(logger)
|
||||
return logger
|
||||
}
|
||||
|
||||
3
go.mod
3
go.mod
@@ -14,6 +14,7 @@ require (
|
||||
github.com/prometheus/procfs v0.0.11
|
||||
github.com/thedevsaddam/gojsonq v2.3.0+incompatible // indirect
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
|
||||
gopkg.in/yaml.v2 v2.2.5
|
||||
)
|
||||
|
||||
28
go.sum
28
go.sum
@@ -1,3 +1,4 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -33,9 +34,11 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
@@ -75,6 +78,7 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI=
|
||||
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@@ -87,24 +91,45 @@ github.com/thedevsaddam/gojsonq v2.3.0+incompatible h1:i2lFTvGY4LvoZ2VUzedsFlRiy
|
||||
github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA=
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0=
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
|
||||
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
|
||||
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
|
||||
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
|
||||
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
|
||||
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
@@ -114,10 +139,13 @@ google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zim
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
|
||||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
|
||||
20
pkg/config/runtime.go
Normal file
20
pkg/config/runtime.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package config
|
||||
|
||||
import "go.uber.org/zap"
|
||||
|
||||
// runtimeContext contains process global settings like the logger,
|
||||
type runtimeContext struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (r *runtimeContext) Logger() *zap.Logger {
|
||||
return r.logger
|
||||
}
|
||||
|
||||
var ProcessContext runtimeContext
|
||||
|
||||
func SetProcessContext(logger *zap.Logger) {
|
||||
ProcessContext = runtimeContext{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
@@ -55,16 +55,13 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
|
||||
for device, metricsRaw := range c.cache.Items() {
|
||||
metrics := metricsRaw.Object.(MetricCollection)
|
||||
for _, metric := range metrics {
|
||||
m, err := prometheus.NewConstMetric(
|
||||
m := prometheus.MustNewConstMetric(
|
||||
metric.Description,
|
||||
metric.ValueType,
|
||||
metric.Value,
|
||||
device,
|
||||
metric.Topic,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
mc <- prometheus.NewMetricWithTimestamp(metric.IngestTime, m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -18,6 +18,7 @@ type Ingest struct {
|
||||
deviceIDRegex *config.Regexp
|
||||
collector Collector
|
||||
MessageMetric *prometheus.CounterVec
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex *config.Regexp) *Ingest {
|
||||
@@ -36,6 +37,7 @@ func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex
|
||||
Help: "received messages per topic and status",
|
||||
}, []string{"status", "topic"},
|
||||
),
|
||||
logger: config.ProcessContext.Logger(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,8 +129,7 @@ func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface
|
||||
|
||||
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
|
||||
return func(c mqtt.Client, m mqtt.Message) {
|
||||
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
|
||||
|
||||
i.logger.Debug("Got message", zap.String("topic", m.Topic()), zap.String("payload", string(m.Payload())))
|
||||
err := i.store(m.Topic(), m.Payload())
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
|
||||
|
||||
@@ -1,23 +1,24 @@
|
||||
package mqttclient
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type SubscribeOptions struct {
|
||||
Topic string
|
||||
QoS byte
|
||||
OnMessageReceived mqtt.MessageHandler
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error {
|
||||
connectionOptions.OnConnect = func(client mqtt.Client) {
|
||||
log.Print("Connected to MQTT Broker.\n")
|
||||
log.Printf("Will subscribe to topic %s", subscribeOptions.Topic)
|
||||
logger := subscribeOptions.Logger
|
||||
logger.Info("Connected to MQTT Broker")
|
||||
logger.Info("Will subscribe to topic", zap.String("topic", subscribeOptions.Topic))
|
||||
if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil {
|
||||
log.Printf("Could not subscribe %s\n", token.Error().Error())
|
||||
logger.Error("Could not subscribe", zap.Error(token.Error()))
|
||||
}
|
||||
}
|
||||
client := mqtt.NewClient(connectionOptions)
|
||||
|
||||
Reference in New Issue
Block a user