mirror of
https://github.com/hikhvar/mqtt2prometheus.git
synced 2026-05-19 14:57:02 +00:00
Merge pull request #24 from hikhvar/refactor-logging
Use go.uber.org/zap for logging
This commit is contained in:
3
Makefile
3
Makefile
@@ -27,6 +27,9 @@ all: build
|
||||
GO111MODULE=on
|
||||
|
||||
|
||||
lint:
|
||||
golangci-lint run
|
||||
|
||||
test:
|
||||
go test ./...
|
||||
go vet ./...
|
||||
|
||||
@@ -4,7 +4,8 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
@@ -45,6 +46,12 @@ var (
|
||||
false,
|
||||
"show the builds version, date and commit",
|
||||
)
|
||||
logLevelFlag = zap.LevelFlag("log-level", zap.InfoLevel, "sets the default loglevel (default: \"info\")")
|
||||
logEncodingFlag = flag.String(
|
||||
"log-format",
|
||||
"console",
|
||||
"set the desired log output format. Valid values are 'console' and 'json'",
|
||||
)
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -53,14 +60,16 @@ func main() {
|
||||
mustShowVersion()
|
||||
os.Exit(0)
|
||||
}
|
||||
logger := mustSetupLogger()
|
||||
defer logger.Sync() //nolint:errcheck
|
||||
c := make(chan os.Signal, 1)
|
||||
hostName, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Fatalf("Could not get hostname. %s\n", err.Error())
|
||||
logger.Fatal("Could not get hostname", zap.Error(err))
|
||||
}
|
||||
cfg, err := config.LoadConfig(*configFlag)
|
||||
if err != nil {
|
||||
log.Fatalf("Could not load config: %s\n", err.Error())
|
||||
logger.Fatal("Could not load config", zap.Error(err))
|
||||
}
|
||||
mqttClientOptions := mqtt.NewClientOptions()
|
||||
mqttClientOptions.AddBroker(cfg.MQTT.Server).SetClientID(hostName).SetCleanSession(true)
|
||||
@@ -78,12 +87,13 @@ func main() {
|
||||
Topic: cfg.MQTT.TopicPath,
|
||||
QoS: cfg.MQTT.QoS,
|
||||
OnMessageReceived: ingest.SetupSubscriptionHandler(errorChan),
|
||||
Logger: logger,
|
||||
})
|
||||
if err == nil {
|
||||
// connected, break loop
|
||||
break
|
||||
}
|
||||
log.Printf("Could not connect to mqtt broker %s, sleep 10 second", err.Error())
|
||||
logger.Warn("could not connect to mqtt broker %s, sleep 10 second", zap.Error(err))
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
|
||||
@@ -93,17 +103,17 @@ func main() {
|
||||
go func() {
|
||||
err = http.ListenAndServe(getListenAddress(), nil)
|
||||
if err != nil {
|
||||
log.Fatalf("Error while serving http: %s", err.Error())
|
||||
logger.Fatal("Error while serving http", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c:
|
||||
log.Println("Terminated via Signal. Stop.")
|
||||
logger.Info("Terminated via Signal. Stop.")
|
||||
os.Exit(0)
|
||||
case err = <-errorChan:
|
||||
log.Printf("Error while processing message. %s", err.Error())
|
||||
logger.Error("Error while processing message", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,3 +147,19 @@ func mustMQTTClientID() string {
|
||||
pid := os.Getpid()
|
||||
return fmt.Sprintf("%s-%d", host, pid)
|
||||
}
|
||||
|
||||
func mustSetupLogger() *zap.Logger {
|
||||
cfg := zap.NewProductionConfig()
|
||||
cfg.Level = zap.NewAtomicLevelAt(*logLevelFlag)
|
||||
cfg.Encoding = *logEncodingFlag
|
||||
if cfg.Encoding == "console" {
|
||||
cfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
|
||||
}
|
||||
logger, err := cfg.Build()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to build logger: %v", err))
|
||||
}
|
||||
|
||||
config.SetProcessContext(logger)
|
||||
return logger
|
||||
}
|
||||
|
||||
3
go.mod
3
go.mod
@@ -14,6 +14,7 @@ require (
|
||||
github.com/prometheus/procfs v0.0.11
|
||||
github.com/thedevsaddam/gojsonq v2.3.0+incompatible // indirect
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
|
||||
gopkg.in/yaml.v2 v2.2.5
|
||||
)
|
||||
|
||||
28
go.sum
28
go.sum
@@ -1,3 +1,4 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -33,9 +34,11 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
@@ -75,6 +78,7 @@ github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI=
|
||||
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
@@ -87,24 +91,45 @@ github.com/thedevsaddam/gojsonq v2.3.0+incompatible h1:i2lFTvGY4LvoZ2VUzedsFlRiy
|
||||
github.com/thedevsaddam/gojsonq v2.3.0+incompatible/go.mod h1:RBcQaITThgJAAYKH7FNp2onYodRz8URfsuEGpAch0NA=
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2 h1:CoMVaYyKFsVj6TjU6APqAhAvC07hTI6IQen8PHzHYY0=
|
||||
github.com/thedevsaddam/gojsonq/v2 v2.5.2/go.mod h1:bv6Xa7kWy82uT0LnXPE2SzGqTj33TAEeR560MdJkiXs=
|
||||
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
|
||||
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
|
||||
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
|
||||
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
|
||||
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
|
||||
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
|
||||
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
|
||||
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
@@ -114,10 +139,13 @@ google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zim
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
|
||||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
|
||||
20
pkg/config/runtime.go
Normal file
20
pkg/config/runtime.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package config
|
||||
|
||||
import "go.uber.org/zap"
|
||||
|
||||
// runtimeContext contains process global settings like the logger,
|
||||
type runtimeContext struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (r *runtimeContext) Logger() *zap.Logger {
|
||||
return r.logger
|
||||
}
|
||||
|
||||
var ProcessContext runtimeContext
|
||||
|
||||
func SetProcessContext(logger *zap.Logger) {
|
||||
ProcessContext = runtimeContext{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
@@ -55,16 +55,13 @@ func (c *MemoryCachedCollector) Collect(mc chan<- prometheus.Metric) {
|
||||
for device, metricsRaw := range c.cache.Items() {
|
||||
metrics := metricsRaw.Object.(MetricCollection)
|
||||
for _, metric := range metrics {
|
||||
m, err := prometheus.NewConstMetric(
|
||||
m := prometheus.MustNewConstMetric(
|
||||
metric.Description,
|
||||
metric.ValueType,
|
||||
metric.Value,
|
||||
device,
|
||||
metric.Topic,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
mc <- prometheus.NewMetricWithTimestamp(metric.IngestTime, m)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"go.uber.org/zap"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -18,6 +18,7 @@ type Ingest struct {
|
||||
deviceIDRegex *config.Regexp
|
||||
collector Collector
|
||||
MessageMetric *prometheus.CounterVec
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex *config.Regexp) *Ingest {
|
||||
@@ -36,6 +37,7 @@ func NewIngest(collector Collector, metrics []config.MetricConfig, deviceIDRegex
|
||||
Help: "received messages per topic and status",
|
||||
}, []string{"status", "topic"},
|
||||
),
|
||||
logger: config.ProcessContext.Logger(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,8 +129,7 @@ func (i *Ingest) parseMetric(metricPath string, deviceID string, value interface
|
||||
|
||||
func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHandler {
|
||||
return func(c mqtt.Client, m mqtt.Message) {
|
||||
log.Printf("Got message '%s' on topic %s\n", string(m.Payload()), m.Topic())
|
||||
|
||||
i.logger.Debug("Got message", zap.String("topic", m.Topic()), zap.String("payload", string(m.Payload())))
|
||||
err := i.store(m.Topic(), m.Payload())
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
|
||||
|
||||
@@ -1,23 +1,24 @@
|
||||
package mqttclient
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type SubscribeOptions struct {
|
||||
Topic string
|
||||
QoS byte
|
||||
OnMessageReceived mqtt.MessageHandler
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
func Subscribe(connectionOptions *mqtt.ClientOptions, subscribeOptions SubscribeOptions) error {
|
||||
connectionOptions.OnConnect = func(client mqtt.Client) {
|
||||
log.Print("Connected to MQTT Broker.\n")
|
||||
log.Printf("Will subscribe to topic %s", subscribeOptions.Topic)
|
||||
logger := subscribeOptions.Logger
|
||||
logger.Info("Connected to MQTT Broker")
|
||||
logger.Info("Will subscribe to topic", zap.String("topic", subscribeOptions.Topic))
|
||||
if token := client.Subscribe(subscribeOptions.Topic, subscribeOptions.QoS, subscribeOptions.OnMessageReceived); token.Wait() && token.Error() != nil {
|
||||
log.Printf("Could not subscribe %s\n", token.Error().Error())
|
||||
logger.Error("Could not subscribe", zap.Error(token.Error()))
|
||||
}
|
||||
}
|
||||
client := mqtt.NewClient(connectionOptions)
|
||||
|
||||
Reference in New Issue
Block a user