From 394f01ab6b8b4e8a6b40d646193f92b703d82d23 Mon Sep 17 00:00:00 2001 From: ylw Date: Wed, 22 Feb 2023 17:37:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(mqtt):=20=E4=BF=AE=E6=94=B9MQTT=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- engine/engine.go | 7 ++++--- engine/engine_helper.go | 2 +- go.mod | 3 ++- go.sum | 7 ++++--- pkg/xmqtt/mqtt.go | 7 ++++--- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 49b4d8f..36d8bc5 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -3,10 +3,11 @@ package engine import ( "context" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/panjf2000/ants" + "github.com/panjf2000/ants/v2" "github.com/pkg/errors" "github.com/yuelwish/mqtt-bridge/pkg/xmqtt" "log" + "runtime" ) const ( @@ -45,7 +46,7 @@ func (e *Engine) Dial() error { } func (e *Engine) handlerMessage(ctx context.Context) { - gPool, _ := ants.NewPool(ants.DEFAULT_ANTS_POOL_SIZE) + gPool, _ := ants.NewPool(runtime.NumCPU() * 10) defer gPool.Release() for msg := range e.MessageChan { @@ -80,7 +81,7 @@ func (e *Engine) handlerMessage(ctx context.Context) { } if err = gPool.Submit(func() { - err := xmqtt.Send(client, msg.Topic, msg.Payload) + err := xmqtt.Send(client, msg.Topic, 0, false, msg.Payload) if err != nil { log.Printf("[send message] %s ==> %v t:%s failed: %v", msg.FromTag, tTag, msg.Topic, err) } else { diff --git a/engine/engine_helper.go b/engine/engine_helper.go index be6bbfb..41124f2 100644 --- a/engine/engine_helper.go +++ b/engine/engine_helper.go @@ -142,6 +142,6 @@ func (e *EngineHelper) BuildEngine() (*Engine, error) { cliSubMap: cliSubMap, filterTree: filterTree, toTopicMap: toTopicMap, - MessageChan: make(chan *Message, 4096), + MessageChan: make(chan *Message, 1024), }, nil } diff --git a/go.mod b/go.mod index a4f9b3d..f672614 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/eclipse/paho.mqtt.golang v1.4.2 - github.com/panjf2000/ants v1.3.0 + github.com/panjf2000/ants/v2 v2.7.1 github.com/pkg/errors v0.9.1 github.com/spf13/viper v1.15.0 ) @@ -16,6 +16,7 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect + github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index f7323f5..4e57474 100644 --- a/go.sum +++ b/go.sum @@ -140,8 +140,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M= -github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY= +github.com/panjf2000/ants/v2 v2.7.1 h1:qBy5lfSdbxvrR0yUnZfaEDjf0FlCw4ufsbcsxmE7r+M= +github.com/panjf2000/ants/v2 v2.7.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= github.com/pelletier/go-toml/v2 v2.0.6 h1:nrzqCb7j9cDFj2coyLNLaZuJTLjWjlaz6nvTvIwycIU= github.com/pelletier/go-toml/v2 v2.0.6/go.mod h1:eumQOmlWiOPt5WriQQqoM5y18pDHwha2N+QD+EUNTek= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -151,7 +151,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk= github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= diff --git a/pkg/xmqtt/mqtt.go b/pkg/xmqtt/mqtt.go index a8db0a4..730dc11 100644 --- a/pkg/xmqtt/mqtt.go +++ b/pkg/xmqtt/mqtt.go @@ -17,7 +17,8 @@ func Init(clientIdPrefix, addr string, optFns ...func(opt *mqtt.ClientOptions)) opts := mqtt.NewClientOptions() opts.AddBroker(addr) opts.SetClientID(clientIdPrefix + "-" + strconv.FormatInt(time.Now().UnixNano(), 36)) - opts.SetKeepAlive(10 * time.Second) + opts.SetKeepAlive(60 * time.Second) + opts.SetPingTimeout(5 * time.Second) opts.SetMaxReconnectInterval(10 * time.Second) opts.SetOnConnectHandler(func(client mqtt.Client) { @@ -69,8 +70,8 @@ func UnSubscribe(client mqtt.Client, topic ...string) error { return nil } -func Send(client mqtt.Client, topic string, payload []byte) error { - token := client.Publish(topic, 1, false, payload) +func Send(client mqtt.Client, topic string, qos byte, retained bool, payload []byte) error { + token := client.Publish(topic, qos, retained, payload) if token.Wait() && token.Error() != nil { return token.Error() }