From 6fa5aba7ff5015b753aadfe5f2be3a959d6020d7 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 10 Mar 2025 13:04:01 +0800 Subject: [PATCH] fix: pull proxy block --- plugin/rtsp/pull-proxy.go | 19 +++++++++++++++---- pull-proxy.go | 34 +++++++++++++++++++++------------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/plugin/rtsp/pull-proxy.go b/plugin/rtsp/pull-proxy.go index 7b1160c..2e43356 100644 --- a/plugin/rtsp/pull-proxy.go +++ b/plugin/rtsp/pull-proxy.go @@ -43,6 +43,11 @@ func (d *RTSPPullProxy) Start() (err error) { return d.TCPPullProxy.Start() } +func (d *RTSPPullProxy) Dispose() { + d.conn.NetConnection.Dispose() + d.TCPPullProxy.Dispose() +} + func (d *RTSPPullProxy) GetTickInterval() time.Duration { return time.Second * 5 } @@ -50,11 +55,17 @@ func (d *RTSPPullProxy) GetTickInterval() time.Duration { func (d *RTSPPullProxy) Tick(any) { switch d.PullProxy.Status { case m7s.PullProxyStatusOffline: - err := d.conn.Connect(d.PullProxy.URL) - if err != nil { - return + if d.Connecting.CompareAndSwap(false, true) { + // 防止阻塞 + go func() { + err := d.conn.Connect(d.PullProxy.URL) + if err != nil { + return + } + d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline) + d.Connecting.Store(false) + }() } - d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline) case m7s.PullProxyStatusOnline, m7s.PullProxyStatusPulling: t := time.Now() err := d.conn.Options() diff --git a/pull-proxy.go b/pull-proxy.go index aec6c46..c88557a 100644 --- a/pull-proxy.go +++ b/pull-proxy.go @@ -7,6 +7,7 @@ import ( "net/url" "path/filepath" "strings" + "sync/atomic" "time" "github.com/mcuadros/go-defaults" @@ -63,8 +64,9 @@ type ( } TCPPullProxy struct { PullProxyTask - TCPAddr *net.TCPAddr - URL *url.URL + TCPAddr *net.TCPAddr + URL *url.URL + Connecting atomic.Bool } ) @@ -200,16 +202,22 @@ func (d *TCPPullProxy) GetTickInterval() time.Duration { } func (d *TCPPullProxy) Tick(any) { - startTime := time.Now() - conn, err := net.DialTCP("tcp", nil, d.TCPAddr) - if err != nil { - d.PullProxy.ChangeStatus(PullProxyStatusOffline) - return - } - conn.Close() - d.PullProxy.RTT = time.Since(startTime) - if d.PullProxy.Status == PullProxyStatusOffline { - d.PullProxy.ChangeStatus(PullProxyStatusOnline) + switch d.PullProxy.Status { + case PullProxyStatusOffline: + if d.Connecting.CompareAndSwap(false, true) { + go func() { + defer d.Connecting.Store(false) + startTime := time.Now() + conn, err := net.DialTCP("tcp", nil, d.TCPAddr) + if err != nil { + d.PullProxy.ChangeStatus(PullProxyStatusOffline) + return + } + conn.Close() + d.PullProxy.RTT = time.Since(startTime) + d.PullProxy.ChangeStatus(PullProxyStatusOnline) + }() + } } } @@ -413,7 +421,7 @@ func (s *Server) RemovePullProxy(ctx context.Context, req *pb.RequestWithId) (re }) return } else if req.StreamPath != "" { - var deviceList []PullProxy + var deviceList []*PullProxy s.DB.Find(&deviceList, "stream_path=?", req.StreamPath) if len(deviceList) > 0 { for _, device := range deviceList {