🌱 Bump github.com/mochi-mqtt/server/v2 from 2.6.6 to 2.7.9 (#1100)

Bumps [github.com/mochi-mqtt/server/v2](https://github.com/mochi-mqtt/server) from 2.6.6 to 2.7.9.
- [Release notes](https://github.com/mochi-mqtt/server/releases)
- [Commits](https://github.com/mochi-mqtt/server/compare/v2.6.6...v2.7.9)

---
updated-dependencies:
- dependency-name: github.com/mochi-mqtt/server/v2
  dependency-version: 2.7.9
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
This commit is contained in:
dependabot[bot]
2025-07-30 00:59:03 +00:00
committed by GitHub
parent 177af8e4c4
commit 9311ed7abf
6 changed files with 49 additions and 15 deletions

2
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/google/cel-go v0.25.0
github.com/google/go-cmp v0.7.0
github.com/mochi-mqtt/server/v2 v2.6.6
github.com/mochi-mqtt/server/v2 v2.7.9
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.36.3
github.com/openshift/api v0.0.0-20250710004639-926605d3338b

4
go.sum
View File

@@ -237,8 +237,8 @@ github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g
github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28=
github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=
github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc=
github.com/mochi-mqtt/server/v2 v2.6.6 h1:FmL5ebeIIA+AKo/nX0DF8Yc2MMWFLQCwh3FZBEmg6dQ=
github.com/mochi-mqtt/server/v2 v2.6.6/go.mod h1:TqztjKGO0/ArOjJt9x9idk0kqPT3CVN8Pb+l+PS5Gdo=
github.com/mochi-mqtt/server/v2 v2.7.9 h1:y0g4vrSLAag7T07l2oCzOa/+nKVLoazKEWAArwqBNYI=
github.com/mochi-mqtt/server/v2 v2.7.9/go.mod h1:lZD3j35AVNqJL5cezlnSkuG05c0FCHSsfAKSPBOSbqc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=

View File

@@ -150,7 +150,7 @@ type ClientState struct {
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
isTakenOver atomic.Bool // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
cancelOpen context.CancelFunc // cancel function for open context
@@ -427,6 +427,10 @@ func (cl *Client) Closed() bool {
return cl.State.open == nil || cl.State.open.Err() != nil
}
func (cl *Client) IsTakenOver() bool {
return cl.State.isTakenOver.Load()
}
// ReadFixedHeader reads in the values of the next packet's fixed header.
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
if cl.Net.bconn == nil {
@@ -529,7 +533,11 @@ func (cl *Client) WritePacket(pk packets.Packet) error {
}
if pk.Expiry > 0 {
pk.Properties.MessageExpiryInterval = uint32(pk.Expiry - time.Now().Unix()) // [MQTT-3.3.2-6]
expiry := pk.Expiry - time.Now().Unix()
if expiry < 1 {
expiry = 1
}
pk.Properties.MessageExpiryInterval = uint32(expiry) // [MQTT-3.3.2-6]
}
pk.ProtocolVersion = cl.Properties.ProtocolVersion

View File

@@ -405,6 +405,8 @@ func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, er
"hook", hook.ID(),
"packet", pkx)
return pk, err
} else if errors.Is(err, packets.CodeSuccessIgnore) {
return pk, err
}
h.Log.Error("publish packet error",
"error", err,

View File

@@ -27,7 +27,7 @@ import (
)
const (
Version = "2.6.6" // the current server version.
Version = "2.7.9" // the current server version.
defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes
LocalListener = "local"
InlineClientId = "inline"
@@ -485,7 +485,7 @@ func (s *Server) attachClient(cl *Client, listener string) error {
expire := (cl.Properties.ProtocolVersion == 5 && cl.Properties.Props.SessionExpiryInterval == 0) || (cl.Properties.ProtocolVersion < 5 && cl.Properties.Clean)
s.hooks.OnDisconnect(cl, err, expire)
if expire && atomic.LoadUint32(&cl.State.isTakenOver) == 0 {
if expire && !cl.IsTakenOver() {
cl.ClearInflights()
s.UnsubscribeClient(cl)
s.Clients.Delete(cl.ID) // [MQTT-4.1.0-2] ![MQTT-3.1.2-23]
@@ -565,11 +565,11 @@ func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool {
if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4]
s.UnsubscribeClient(existing)
existing.ClearInflights()
atomic.StoreUint32(&existing.State.isTakenOver, 1) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
existing.State.isTakenOver.Store(true) // only set isTakenOver after unsubscribe has occurred
return false // [MQTT-3.2.2-3]
}
atomic.StoreUint32(&existing.State.isTakenOver, 1)
existing.State.isTakenOver.Store(true)
if existing.State.Inflight.Len() > 0 {
cl.State.Inflight = existing.State.Inflight.Clone() // [MQTT-3.1.2-5]
if cl.State.Inflight.maximumReceiveQuota == 0 && cl.ops.options.Capabilities.ReceiveMaximum != 0 {
@@ -885,6 +885,11 @@ func (s *Server) processPublish(cl *Client, pk packets.Packet) error {
pk.Origin = cl.ID
pk.Created = time.Now().Unix()
if expiry := minimum(s.Options.Capabilities.MaximumMessageExpiryInterval,
int64(pk.Properties.MessageExpiryInterval)); expiry > 0 {
pk.Expiry = pk.Created + expiry
}
if !cl.Net.Inline {
if pki, ok := cl.State.Inflight.Get(pk.PacketID); ok {
if pki.FixedHeader.Type == packets.Pubrec { // [MQTT-4.3.3-10]
@@ -986,9 +991,11 @@ func (s *Server) publishToSubscribers(pk packets.Packet) {
pk.Created = time.Now().Unix()
}
pk.Expiry = pk.Created + s.Options.Capabilities.MaximumMessageExpiryInterval
if pk.Properties.MessageExpiryInterval > 0 {
pk.Expiry = pk.Created + int64(pk.Properties.MessageExpiryInterval)
if pk.Expiry == 0 {
if expiry := minimum(s.Options.Capabilities.MaximumMessageExpiryInterval,
int64(pk.Properties.MessageExpiryInterval)); expiry > 0 {
pk.Expiry = pk.Created + expiry
}
}
subscribers := s.Topics.Subscribers(pk.TopicName)
@@ -1358,7 +1365,7 @@ func (s *Server) UnsubscribeClient(cl *Client) {
cl.State.Subscriptions.Delete(k)
}
if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
if cl.IsTakenOver() {
return
}
@@ -1755,3 +1762,20 @@ func (s *Server) sendDelayedLWT(dt int64) {
func Int64toa(v int64) string {
return strconv.FormatInt(v, 10)
}
// minimum differs from built-in min, it returns minimum of the non-zero value a and b.
// If both a and b are zero value, it reutrns 0.
func minimum(a, b int64) (m int64) {
if a != 0 {
m = a
if b != 0 && b < a {
m = b
}
return
}
if b != 0 {
m = b
}
return
}

2
vendor/modules.txt vendored
View File

@@ -366,7 +366,7 @@ github.com/mitchellh/reflectwalk
## explicit; go 1.19
# github.com/moby/sys/user v0.4.0
## explicit; go 1.17
# github.com/mochi-mqtt/server/v2 v2.6.6
# github.com/mochi-mqtt/server/v2 v2.7.9
## explicit; go 1.21
github.com/mochi-mqtt/server/v2
github.com/mochi-mqtt/server/v2/hooks/auth