Add PublishDropped metrics (#167)

* Add PublishDropped

* Add PublishDropped

* Add PublishDropped

* Update storage_test.go

* Update system.go

* Update server.go
This commit is contained in:
thedevop
2023-02-10 06:44:01 -08:00
committed by GitHub
parent aac245441a
commit 4b039cb35c
4 changed files with 8 additions and 1 deletions
+2 -1
View File
@@ -104,6 +104,7 @@ var (
ClientsMaximum: 7,
MessagesReceived: 10,
MessagesSent: 11,
PublishDropped: 20,
PacketsReceived: 12,
PacketsSent: 13,
Retained: 15,
@@ -111,7 +112,7 @@ var (
InflightDropped: 17,
},
}
sysInfoJSON = []byte(`{"version":"2.0.0","started":1,"time":0,"uptime":2,"bytes_received":3,"bytes_sent":4,"clients_connected":5,"clients_disconnected":0,"clients_maximum":7,"clients_total":0,"messages_received":10,"messages_sent":11,"retained":15,"inflight":16,"inflight_dropped":17,"subscriptions":0,"packets_received":12,"packets_sent":13,"memory_alloc":0,"threads":0,"t":"info","id":"id"}`)
sysInfoJSON = []byte(`{"version":"2.0.0","started":1,"time":0,"uptime":2,"bytes_received":3,"bytes_sent":4,"clients_connected":5,"clients_disconnected":0,"clients_maximum":7,"clients_total":0,"messages_received":10,"messages_sent":11,"publish_dropped":20,"retained":15,"inflight":16,"inflight_dropped":17,"subscriptions":0,"packets_received":12,"packets_sent":13,"memory_alloc":0,"threads":0,"t":"info","id":"id"}`)
)
func TestClientMarshalBinary(t *testing.T) {
+2
View File
@@ -831,6 +831,7 @@ func (s *Server) publishToClient(cl *Client, sub packets.Subscription, pk packet
case cl.State.outbound <- out:
atomic.AddInt32(&cl.State.outboundQty, 1)
default:
atomic.AddInt64(&s.Info.PublishDropped, 1)
cl.ops.hooks.OnPublishDropped(cl, pk)
cl.State.Inflight.Delete(out.PacketID) // packet was dropped due to irregular circumstances, so rollback inflight.
cl.State.Inflight.IncreaseSendQuota()
@@ -1324,6 +1325,7 @@ func (s *Server) loadServerInfo(v system.Info) {
atomic.StoreInt64(&s.Info.ClientsDisconnected, v.ClientsDisconnected)
atomic.StoreInt64(&s.Info.MessagesReceived, v.MessagesReceived)
atomic.StoreInt64(&s.Info.MessagesSent, v.MessagesSent)
atomic.StoreInt64(&s.Info.PublishDropped, v.PublishDropped)
atomic.StoreInt64(&s.Info.PacketsReceived, v.PacketsReceived)
atomic.StoreInt64(&s.Info.PacketsSent, v.PacketsSent)
atomic.StoreInt64(&s.Info.InflightDropped, v.InflightDropped)
+3
View File
@@ -22,6 +22,7 @@ type Info struct {
ClientsTotal int64 `json:"clients_total"` // total number of connected and disconnected clients with a persistent session currently connected and registered
MessagesReceived int64 `json:"messages_received"` // total number of publish messages received
MessagesSent int64 `json:"messages_sent"` // total number of publish messages sent
PublishDropped int64 `json:"publish_dropped"` // total number of messages dropped to slow subscriber
Retained int64 `json:"retained"` // total number of retained messages active on the broker
Inflight int64 `json:"inflight"` // the number of messages currently in-flight
InflightDropped int64 `json:"inflight_dropped"` // the number of inflight messages which were dropped
@@ -32,6 +33,7 @@ type Info struct {
Threads int64 `json:"threads"` // number of active goroutines, named as threads for platform ambiguity
}
// Clone makes a copy of Info using atomic operation
func (i *Info) Clone() *Info {
return &Info{
Version: i.Version,
@@ -46,6 +48,7 @@ func (i *Info) Clone() *Info {
ClientsDisconnected: atomic.LoadInt64(&i.ClientsDisconnected),
MessagesReceived: atomic.LoadInt64(&i.MessagesReceived),
MessagesSent: atomic.LoadInt64(&i.MessagesSent),
PublishDropped: atomic.LoadInt64(&i.PublishDropped),
Retained: atomic.LoadInt64(&i.Retained),
Inflight: atomic.LoadInt64(&i.Inflight),
InflightDropped: atomic.LoadInt64(&i.InflightDropped),
+1
View File
@@ -20,6 +20,7 @@ func TestClone(t *testing.T) {
ClientsDisconnected: 9,
MessagesReceived: 10,
MessagesSent: 11,
PublishDropped: 20,
Retained: 12,
Inflight: 13,
InflightDropped: 14,