fix: pull proxy block

This commit is contained in:
langhuihui
2025-03-10 13:04:01 +08:00
parent 4a52cc89bc
commit 6fa5aba7ff
2 changed files with 36 additions and 17 deletions
+15 -4
View File
@@ -43,6 +43,11 @@ func (d *RTSPPullProxy) Start() (err error) {
return d.TCPPullProxy.Start()
}
func (d *RTSPPullProxy) Dispose() {
d.conn.NetConnection.Dispose()
d.TCPPullProxy.Dispose()
}
func (d *RTSPPullProxy) GetTickInterval() time.Duration {
return time.Second * 5
}
@@ -50,11 +55,17 @@ func (d *RTSPPullProxy) GetTickInterval() time.Duration {
func (d *RTSPPullProxy) Tick(any) {
switch d.PullProxy.Status {
case m7s.PullProxyStatusOffline:
err := d.conn.Connect(d.PullProxy.URL)
if err != nil {
return
if d.Connecting.CompareAndSwap(false, true) {
// 防止阻塞
go func() {
err := d.conn.Connect(d.PullProxy.URL)
if err != nil {
return
}
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline)
d.Connecting.Store(false)
}()
}
d.PullProxy.ChangeStatus(m7s.PullProxyStatusOnline)
case m7s.PullProxyStatusOnline, m7s.PullProxyStatusPulling:
t := time.Now()
err := d.conn.Options()
+21 -13
View File
@@ -7,6 +7,7 @@ import (
"net/url"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/mcuadros/go-defaults"
@@ -63,8 +64,9 @@ type (
}
TCPPullProxy struct {
PullProxyTask
TCPAddr *net.TCPAddr
URL *url.URL
TCPAddr *net.TCPAddr
URL *url.URL
Connecting atomic.Bool
}
)
@@ -200,16 +202,22 @@ func (d *TCPPullProxy) GetTickInterval() time.Duration {
}
func (d *TCPPullProxy) Tick(any) {
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
return
}
conn.Close()
d.PullProxy.RTT = time.Since(startTime)
if d.PullProxy.Status == PullProxyStatusOffline {
d.PullProxy.ChangeStatus(PullProxyStatusOnline)
switch d.PullProxy.Status {
case PullProxyStatusOffline:
if d.Connecting.CompareAndSwap(false, true) {
go func() {
defer d.Connecting.Store(false)
startTime := time.Now()
conn, err := net.DialTCP("tcp", nil, d.TCPAddr)
if err != nil {
d.PullProxy.ChangeStatus(PullProxyStatusOffline)
return
}
conn.Close()
d.PullProxy.RTT = time.Since(startTime)
d.PullProxy.ChangeStatus(PullProxyStatusOnline)
}()
}
}
}
@@ -413,7 +421,7 @@ func (s *Server) RemovePullProxy(ctx context.Context, req *pb.RequestWithId) (re
})
return
} else if req.StreamPath != "" {
var deviceList []PullProxy
var deviceList []*PullProxy
s.DB.Find(&deviceList, "stream_path=?", req.StreamPath)
if len(deviceList) > 0 {
for _, device := range deviceList {