support onvif

This commit is contained in:
xugo
2025-11-23 03:57:22 +08:00
parent 08c988f4cc
commit dc83d3b278
43 changed files with 1383 additions and 689 deletions
-1
View File
@@ -10,7 +10,6 @@ RUN apk --no-cache add ca-certificates \
WORKDIR /app
ADD ./build/linux_${TARGETARCH}/bin ./
ADD ./configs/config.toml /app/configs/config.toml
ADD ./www /app/www
LABEL Name=gowvp Version=0.0.1
+11 -1
View File
@@ -3,6 +3,9 @@ FROM debian:bullseye-slim
ENV TZ=Asia/Shanghai
WORKDIR /opt/media/bin/
RUN sed -i 's|http://deb.debian.org|http://mirrors.tuna.tsinghua.edu.cn|g; \
s|http://security.debian.org|http://mirrors.tuna.tsinghua.edu.cn|g' /etc/apt/sources.list
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
tzdata \
@@ -12,14 +15,21 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& rm -rf /var/lib/apt/lists/*
COPY --from=zlmediakit/zlmediakit:master /opt/media/bin /opt/media/bin
COPY --from=mwader/static-ffmpeg:6.1 /ffmpeg /usr/local/bin/ffmpeg
ADD ./build/linux_amd64/bin ./gowvp
ADD ./www ./www
RUN mkdir -p configs
RUN ln -sf /usr/local/bin/ffmpeg /usr/bin/ffmpeg && \
chmod +x /usr/local/bin/ffmpeg
RUN ln -sf /usr/local/bin/ffmpeg /opt/media/bin/ffmpeg
LABEL Name=gowvp \
Version=0.0.1 \
Version=0.1.0 \
Maintainer="xx@golang.space" \
Description="gowvp & zlmediakit"
+12 -7
View File
@@ -9,7 +9,9 @@ require (
github.com/gin-gonic/gin v1.11.0
github.com/glebarez/sqlite v1.11.0
github.com/google/wire v0.7.0
github.com/ixugo/goddd v1.5.0
github.com/gowvp/onvif v0.0.11
github.com/ixugo/goddd v1.5.1
github.com/ixugo/netpulse v0.1.3
github.com/jinzhu/copier v0.4.0
github.com/pelletier/go-toml/v2 v2.2.4
github.com/shirou/gopsutil/v4 v4.25.7
@@ -20,10 +22,13 @@ require (
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/beevik/etree v1.1.0 // indirect
github.com/bytedance/gopkg v0.1.3 // indirect
github.com/ebitengine/purego v0.8.4 // indirect
github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/goccy/go-yaml v1.18.0 // indirect
github.com/juju/errors v1.0.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/quic-go/qpack v0.5.1 // indirect
@@ -47,7 +52,7 @@ require (
github.com/goccy/go-json v0.10.5 // indirect
github.com/gofrs/uuid v4.4.0+incompatible
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/google/uuid v1.6.0
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.2 // indirect
@@ -75,12 +80,12 @@ require (
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
golang.org/x/arch v0.22.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/crypto v0.44.0 // indirect
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0
golang.org/x/net v0.47.0 // indirect
golang.org/x/sync v0.18.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0
golang.org/x/time v0.12.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
modernc.org/libc v1.61.5 // indirect
+30 -12
View File
@@ -2,6 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
@@ -17,6 +19,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw=
github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371 h1:4WADfZZW26C7UgER4MEwZpS/THGj0VEf6HiXS3PyRfo=
github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371/go.mod h1:qxVxKgX2MC/LcAmvASQ96hjjlcBOV6wQ+ZV9r1+nB3k=
github.com/gabriel-vasile/mimetype v1.4.10 h1:zyueNbySn/z8mJZHLt6IPw0KoZsiQNszIpU+bX4+ZK0=
github.com/gabriel-vasile/mimetype v1.4.10/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk=
@@ -61,8 +65,12 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4=
github.com/google/wire v0.7.0/go.mod h1:n6YbUQD9cPKTnHXEBN2DXlOp/mVADhVErcMFb0v3J18=
github.com/ixugo/goddd v1.5.0 h1:XpdhZ8zIInN2NYMMrnNmVb3BZpa1ZXqiFAu5gQDAdGA=
github.com/ixugo/goddd v1.5.0/go.mod h1:FzEjEd6uWEWan1XWTh8VXdqGtyjMYGow/URNtBY8X7w=
github.com/gowvp/onvif v0.0.11 h1:Y2ULsB9j2aPlg2/q3WpVxYU7ftWA1dOjbqtY/8ANU/c=
github.com/gowvp/onvif v0.0.11/go.mod h1:Dshr55Q/Xgwa9XMQBPBQBMOWj/2Sq+DxLhdNY35uoFc=
github.com/ixugo/goddd v1.5.1 h1:1GSFcBMTNz84PSRaUuOin4/6GSda9jc9pAzy78IwRlw=
github.com/ixugo/goddd v1.5.1/go.mod h1:FzEjEd6uWEWan1XWTh8VXdqGtyjMYGow/URNtBY8X7w=
github.com/ixugo/netpulse v0.1.3 h1:760mxad/boWr5hxY2nD0I0yfmQcoNDrlu8KKyk7jOs0=
github.com/ixugo/netpulse v0.1.3/go.mod h1:vH0zFyVMxDkz8jVHtI9/2oEb7npi/5+eSIx5RzHkN4g=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
@@ -81,9 +89,15 @@ github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/juju/errors v1.0.0 h1:yiq7kjCLll1BiaRuNY53MGI0+EQ3rF6GB+wvboZDefM=
github.com/juju/errors v1.0.0/go.mod h1:B5x9thDqx0wIMH3+aLIMP9HjItInYWObRovoCFM5Qe8=
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
@@ -119,6 +133,8 @@ github.com/quic-go/quic-go v0.55.0 h1:zccPQIqYCXDt5NmcEabyYvOnomjs8Tlwl7tISjJh9M
github.com/quic-go/quic-go v0.55.0/go.mod h1:DR51ilwU1uE164KuWXhinFcKWGlEjzys2l8zUl5Ss1U=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/shirou/gopsutil/v4 v4.25.7 h1:bNb2JuqKuAu3tRlPv5piSmBZyMfecwQ+t/ILq+1JqVM=
github.com/shirou/gopsutil/v4 v4.25.7/go.mod h1:XV/egmwJtd3ZQjBpJVY5kndsiOO4IRqy9TQnmm6VP7U=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -153,23 +169,23 @@ go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U=
go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ=
golang.org/x/arch v0.22.0 h1:c/Zle32i5ttqRXjdLyyHZESLD/bB90DCU1g9l/0YBDI=
golang.org/x/arch v0.22.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A=
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU=
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b h1:18qgiDvlvH7kk8Ioa8Ov+K6xCi0GMvmGfGW0sgd/SYA=
golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4=
golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
@@ -178,6 +194,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+84
View File
@@ -0,0 +1,84 @@
package gbadapter
import (
"context"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs"
)
var _ ipc.Protocoler = (*Adapter)(nil)
type Adapter struct {
adapter ipc.Adapter
gbs *gbs.Server
smsCore sms.Core
}
// DeleteDevice implements ipc.Protocoler.
func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error {
return nil
}
func NewAdapter(adapter ipc.Adapter, gbs *gbs.Server, smsCore sms.Core) *Adapter {
return &Adapter{adapter: adapter, gbs: gbs, smsCore: smsCore}
}
// InitDevice implements ipc.Protocoler.
func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
// OnStreamChanged implements ipc.Protocoler.
func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error {
ch, err := a.adapter.GetChannel(ctx, stream)
if err != nil {
return err
}
return a.gbs.StopPlay(ctx, &gbs.StopPlayInput{Channel: ch})
}
// OnStreamNotFound implements ipc.Protocoler.
func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream string) error {
ch, err := a.adapter.GetChannel(ctx, stream)
if err != nil {
return err
}
dev, err := a.adapter.GetDevice(ctx, ch.DID)
if err != nil {
return err
}
svr, err := a.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
return a.gbs.Play(&gbs.PlayInput{
Channel: ch,
StreamMode: dev.StreamMode,
SMS: svr,
})
}
// QueryCatalog implements ipc.Protocoler.
func (a *Adapter) QueryCatalog(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
// StartPlay implements ipc.Protocoler.
func (a *Adapter) StartPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) (*ipc.PlayResponse, error) {
panic("unimplemented")
}
// StopPlay implements ipc.Protocoler.
func (a *Adapter) StopPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) error {
panic("unimplemented")
}
// ValidateDevice implements ipc.Protocoler.
func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
+65
View File
@@ -0,0 +1,65 @@
package onvifadapter
import (
"context"
"fmt"
"log/slog"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/zlm"
"github.com/ixugo/goddd/pkg/orm"
)
func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error {
var ch ipc.Channel
if err := a.adapter.Store().Channel().Get(ctx, &ch, orm.Where("id=?", stream)); err != nil {
return err
}
if err := a.adapter.EditPlaying(ctx, ch.DeviceID, ch.ChannelID, false); err != nil {
slog.ErrorContext(ctx, "编辑播放状态失败", "err", err)
}
return nil
}
func (a *Adapter) OnStreamNotFound(ctx context.Context, app, stream string) error {
var ch ipc.Channel
if err := a.adapter.Store().Channel().Get(ctx, &ch, orm.Where("id=?", stream)); err != nil {
return err
}
onvifDev, ok := a.devices.Load(ch.DeviceID)
if !ok {
return fmt.Errorf("ONVIF 设备未初始化")
}
streamURI, err := a.getStreamURI(ctx, onvifDev, ch.ChannelID)
if err != nil {
return err
}
svr, err := a.sms.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
_, err = a.sms.AddStreamProxy(svr, zlm.AddStreamProxyRequest{
Vhost: "__defaultVhost__",
App: app,
Stream: stream,
URL: streamURI,
RetryCount: 3,
TimeoutSec: 10,
EnableHLSFMP4: zlm.NewBool(true),
EnableAudio: zlm.NewBool(true),
EnableRTSP: zlm.NewBool(true),
EnableRTMP: zlm.NewBool(true),
AddMuteAudio: zlm.NewBool(true),
AutoClose: zlm.NewBool(true),
})
if err == nil {
if err := a.adapter.EditPlaying(ctx, ch.DeviceID, ch.ChannelID, true); err != nil {
slog.ErrorContext(ctx, "编辑播放状态失败", "err", err)
}
}
return err
}
@@ -0,0 +1,99 @@
package onvifadapter
import (
"context"
"log/slog"
"time"
"github.com/gowvp/gb28181/internal/core/ipc"
devicemodel "github.com/gowvp/onvif/device"
sdkdevice "github.com/gowvp/onvif/sdk/device"
"github.com/ixugo/goddd/pkg/conc"
"github.com/ixugo/goddd/pkg/orm"
)
// startHealthCheck 启动 ONVIF 设备健康检查(异步心跳 + 状态机方案)
//
// 设计思路:
// 1. 协程 1: 定期发送心跳(30秒),成功则更新内存中的最后心跳时间
// 2. 协程 2: 定期检查状态(1秒),超时未心跳则标记离线
// 3. 状态变化时才同步到数据库,减少数据库写入
func (a *Adapter) startHealthCheck(ctx context.Context) {
const (
heartbeatInterval = 30 * time.Second // 心跳间隔:30秒
checkInterval = 1 * time.Second // 状态检查间隔:1秒
heartbeatTimeout = 70 * time.Second // 心跳超时:60秒
)
// 协程 1: 定期发送心跳
go a.startHeartbeat(ctx, heartbeatInterval)
// 协程 2: 定期检查状态
go a.startStatusChecker(ctx, checkInterval, heartbeatTimeout)
}
// startHeartbeat 协程 1: 定期发送心跳
func (a *Adapter) startHeartbeat(ctx context.Context, interval time.Duration) {
conc.Timer(ctx, interval, interval, func() {
a.devices.Range(func(deviceID string, dev *Device) bool {
// TODO: 设计上预期接入数量较少,可以开几个协程
// 如果接入设备数量较多,需要优化
go a.sendHeartbeat(dev)
return true
})
})
}
func (a *Adapter) sendHeartbeat(dev *Device) {
_, err := sdkdevice.Call_GetDeviceInformation(context.TODO(), dev.Device, devicemodel.GetDeviceInformation{})
if err == nil {
dev.KeepaliveAt = orm.Now()
}
}
// startStatusChecker 协程 2: 定期检查状态
func (a *Adapter) startStatusChecker(ctx context.Context, interval, heartbeatTimeout time.Duration) {
conc.Timer(ctx, interval, interval, func() {
now := time.Now()
a.devices.Range(func(did string, dev *Device) bool {
timeSinceLastKeepalive := now.Sub(dev.KeepaliveAt.Time)
isOnline := timeSinceLastKeepalive < heartbeatTimeout
if dev.IsOnline == isOnline {
return true
}
dev.IsOnline = isOnline
// 记录状态变化日志
if isOnline {
slog.InfoContext(ctx, "ONVIF 设备上线",
"device_id", did,
"offline_duration", timeSinceLastKeepalive)
} else {
slog.WarnContext(ctx, "ONVIF 设备离线",
"device_id", did,
"last_keepalive", dev.KeepaliveAt.Time,
"timeout", timeSinceLastKeepalive)
}
a.syncDeviceStatusToDB(ctx, did, isOnline)
return true
})
})
}
// syncDeviceStatusToDB 同步设备状态到数据库(状态变化时调用)
func (a *Adapter) syncDeviceStatusToDB(ctx context.Context, did string, isOnline bool) {
// 更新设备状态
if err := a.adapter.Edit(did, func(d *ipc.Device) {
d.IsOnline = isOnline
if isOnline {
d.KeepaliveAt = orm.Now()
}
}); err != nil {
slog.ErrorContext(ctx, "更新设备在线状态失败", "err", err, "device_id", did)
return
}
}
+21
View File
@@ -0,0 +1,21 @@
package onvifadapter
import (
"strings"
"github.com/gowvp/onvif"
)
type DiscoverResponse struct {
Addr string `json:"addr"`
}
func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse {
addr := dev.GetDeviceParams().Xaddr
if !strings.Contains(addr, ":") {
addr += ":80"
}
return &DiscoverResponse{
Addr: addr,
}
}
+295
View File
@@ -0,0 +1,295 @@
package onvifadapter
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/onvif"
devicemodel "github.com/gowvp/onvif/device"
m "github.com/gowvp/onvif/media"
sdkdevice "github.com/gowvp/onvif/sdk/device"
sdkmedia "github.com/gowvp/onvif/sdk/media"
xsdonvif "github.com/gowvp/onvif/xsd/onvif"
"github.com/ixugo/goddd/pkg/conc"
"github.com/ixugo/goddd/pkg/orm"
)
var _ ipc.Protocoler = (*Adapter)(nil)
// Adapter ONVIF 协议适配器
//
// 设计说明:
// - 适配器实现 ipc.Protocol 接口(Port 在 ipc 包内)
// - 适配器直接依赖领域模型 (ipc.Device, ipc.Channel)
// - 适配器依赖 ipc.Adapter 来访问存储和通用功能
// - 这符合清晰架构: 外层(适配器)依赖内层(领域)
type Adapter struct {
devices conc.Map[string, *Device] // ONVIF 设备连接缓存
adapter ipc.Adapter // 通用适配器,提供 SaveChannels 等方法
client *http.Client
sms sms.Core
}
// Device ONVIF 设备包装(内存状态 + ONVIF 连接)
type Device struct {
*onvif.Device
KeepaliveAt orm.Time // 最后心跳时间
IsOnline bool // 在线状态(内存缓存)
}
// DeleteDevice implements ipc.Protocoler.
func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error {
a.devices.Delete(device.ID)
return nil
}
func NewAdapter(adapter ipc.Adapter, sms sms.Core) *Adapter {
cli := *http.DefaultClient
cli.Timeout = time.Millisecond * 3000
a := Adapter{
adapter: adapter,
client: &cli,
sms: sms,
}
a.init()
// 启动健康检查
go a.startHealthCheck(context.Background())
return &a
}
func (a *Adapter) init() {
devices, err := a.adapter.FindDevices(context.TODO())
if err != nil {
panic(err)
}
for _, device := range devices {
if device.IsOnvif() {
go func(device *ipc.Device) {
onvifDev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.IP, device.Port),
Username: device.GetUsername(),
Password: device.Password,
HttpClient: a.client,
})
if err != nil {
_ = a.adapter.Edit(device.ID, func(d *ipc.Device) {
d.IsOnline = false
})
slog.Error("初始化 ONVIF 设备失败", "err", err, "device_id", device.ID)
}
a.devices.Store(device.ID, &Device{
Device: onvifDev,
KeepaliveAt: orm.Now(),
IsOnline: err == nil,
})
}(device)
}
}
}
// ValidateDevice 实现 ipc.Protocol 接口 - ONVIF 设备验证
//
// 直接使用 *ipc.Device 作为参数,不需要类型断言!
func (a *Adapter) ValidateDevice(ctx context.Context, dev *ipc.Device) error {
// 不需要类型断言,直接使用领域模型
// 直接访问领域模型的字段
onvifDev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port),
Username: dev.GetUsername(),
Password: dev.Password,
HttpClient: a.client,
})
if err != nil {
return fmt.Errorf("IP 或 PORT 错误: %w", err)
}
// 获取设备信息并填充到领域模型
resp, err := sdkdevice.Call_GetDeviceInformation(ctx, onvifDev, devicemodel.GetDeviceInformation{})
if err != nil {
return fmt.Errorf("账号或密码错误: %w", err)
}
dev.Transport = "tcp"
dev.Ext.Firmware = resp.FirmwareVersion
dev.Ext.Manufacturer = resp.Manufacturer
dev.Ext.Model = resp.Model
dev.IsOnline = true
dev.Address = fmt.Sprintf("%s:%d", dev.IP, dev.Port)
return nil
}
// InitDevice 实现 ipc.Protocol 接口 - 初始化 ONVIF 设备
// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道
func (a *Adapter) InitDevice(ctx context.Context, dev *ipc.Device) error {
// 创建 ONVIF 连接
onvifDev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port),
Username: dev.GetUsername(),
Password: dev.Password,
HttpClient: a.client,
})
if err != nil {
return err
}
// 缓存设备连接
d := Device{
Device: onvifDev,
KeepaliveAt: orm.Now(),
IsOnline: true,
}
a.devices.Store(dev.ID, &d)
// 自动查询 Profiles 作为通道
return a.queryAndSaveProfiles(ctx, dev, &d)
}
// QueryCatalog 实现 ipc.Protocol 接口 - ONVIF 查询 Profiles
func (a *Adapter) QueryCatalog(ctx context.Context, dev *ipc.Device) error {
onvifDev, ok := a.devices.Load(dev.ID)
if !ok {
// 设备连接不在缓存中,尝试重新连接
var err error
d, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port),
Username: dev.GetUsername(),
Password: dev.Password,
})
if err != nil {
return fmt.Errorf("ONVIF 设备未初始化: %w", err)
}
onvifDev = &Device{
Device: d,
KeepaliveAt: orm.Now(),
IsOnline: true,
}
a.devices.Store(dev.ID, onvifDev)
}
return a.queryAndSaveProfiles(ctx, dev, onvifDev)
}
// StartPlay 实现 ipc.Protocol 接口 - ONVIF 播放
func (a *Adapter) StartPlay(ctx context.Context, dev *ipc.Device, ch *ipc.Channel) (*ipc.PlayResponse, error) {
onvifDev, ok := a.devices.Load(dev.ID)
if !ok {
return nil, fmt.Errorf("ONVIF 设备未初始化")
}
// 获取 RTSP 地址
streamURI, err := a.getStreamURI(ctx, onvifDev, ch.ChannelID)
if err != nil {
return nil, err
}
return &ipc.PlayResponse{
RTSP: streamURI,
}, nil
}
// StopPlay 实现 ipc.Protocol 接口 - ONVIF 停止播放
func (a *Adapter) StopPlay(ctx context.Context, dev *ipc.Device, ch *ipc.Channel) error {
// ONVIF 通常不需要显式停止播放
return nil
}
// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道
//
// 使用统一的 SaveChannels 方法,自动处理增量更新和删除
func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device *ipc.Device, onvifDev *Device) error {
resp, err := sdkmedia.Call_GetProfiles(ctx, onvifDev.Device, m.GetProfiles{})
if err != nil {
return fmt.Errorf("账号或密码错误: %w", err)
}
// 将 Profiles 转换为通道列表
channels := make([]*ipc.Channel, 0, len(resp.Profiles))
for _, profile := range resp.Profiles {
channel := &ipc.Channel{
DeviceID: device.ID,
ChannelID: string(profile.Token),
Name: string(profile.Name),
DID: device.ID,
IsOnline: true,
Type: ipc.TypeOnvif,
}
channels = append(channels, channel)
}
// 使用统一的 SaveChannels 方法保存(自动处理增删改)
if err := a.adapter.SaveChannels(channels); err != nil {
return fmt.Errorf("保存 ONVIF 通道失败: %w", err)
}
slog.InfoContext(ctx, "ONVIF Profiles 同步完成",
"device_id", device.ID,
"profile_count", len(channels))
return nil
}
// getStreamURI 获取 RTSP 流地址
func (a *Adapter) getStreamURI(ctx context.Context, dev *Device, profileToken string) (string, error) {
var param m.GetStreamUri
param.StreamSetup.Transport.Protocol = "RTSP"
param.StreamSetup.Stream = "RTP-Unicast"
param.ProfileToken = xsdonvif.ReferenceToken(profileToken)
resp, err := sdkmedia.Call_GetStreamUri(ctx, dev.Device, param)
if err != nil {
return "", err
}
playURL := buildPlayURL(string(resp.MediaUri.Uri), dev.Device.GetDeviceParams().Username, dev.Device.GetDeviceParams().Password)
return playURL, nil
}
func buildPlayURL(rawurl, username, password string) string {
if username != "" && password != "" {
return strings.Replace(rawurl, "rtsp://", fmt.Sprintf("rtsp://%s:%s@", username, password), 1)
}
return rawurl
}
func (a *Adapter) Discover(ctx context.Context, w io.Writer) error {
recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces()
if err != nil {
return err
}
defer cancel()
for {
select {
case dev, ok := <-recv:
if !ok {
return nil
}
var exists bool
a.devices.Range(func(key string, value *Device) bool {
if value.GetDeviceParams().Xaddr == dev.GetDeviceParams().Xaddr {
exists = true
return false
}
return true
})
if exists {
continue
}
b, _ := json.Marshal(toDiscoverResponse(dev))
_, _ = w.Write(b)
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
slog.DebugContext(ctx, "discover timeout")
return nil
}
}
}
+94
View File
@@ -0,0 +1,94 @@
package rtspadapter
import (
"context"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/proxy"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/zlm"
)
var _ ipc.Protocoler = (*Adapter)(nil)
type Adapter struct {
proxyCore *proxy.Core
smsCore sms.Core
}
// DeleteDevice implements ipc.Protocoler.
func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error {
return nil
}
func NewAdapter(proxyCore *proxy.Core, smsCore sms.Core) *Adapter {
return &Adapter{
proxyCore: proxyCore,
smsCore: smsCore,
}
}
// InitDevice implements ipc.Protocoler.
func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
// OnStreamChanged implements ipc.Protocoler.
func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error {
return nil
}
// OnStreamNotFound implements ipc.Protocoler.
func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream string) error {
proxy, err := a.proxyCore.GetStreamProxy(ctx, stream)
if err != nil {
return err
}
svr, err := a.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
resp, err := a.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{
Vhost: "__defaultVhost__",
App: proxy.App,
Stream: proxy.Stream,
URL: proxy.SourceURL,
RetryCount: 3,
RTPType: proxy.Transport,
TimeoutSec: 10,
EnableHLSFMP4: zlm.NewBool(true),
EnableAudio: zlm.NewBool(true),
EnableRTSP: zlm.NewBool(true),
EnableRTMP: zlm.NewBool(true),
AddMuteAudio: zlm.NewBool(true),
AutoClose: zlm.NewBool(true),
})
if err != nil {
return err
}
// 用于关闭
a.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID)
return nil
}
// QueryCatalog implements ipc.Protocoler.
func (a *Adapter) QueryCatalog(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
// StartPlay implements ipc.Protocoler.
func (a *Adapter) StartPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) (*ipc.PlayResponse, error) {
panic("unimplemented")
}
// StopPlay implements ipc.Protocoler.
func (a *Adapter) StopPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) error {
panic("unimplemented")
}
// ValidateDevice implements ipc.Protocoler.
func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error {
panic("unimplemented")
}
+9 -9
View File
@@ -35,8 +35,8 @@ func Run(bc *conf.Bootstrap) {
go setupZLM(ctx, bc.ConfigDir)
// 如果需要执行表迁移,递增此版本号和表更新说明
versionapi.DBVersion = "0.0.14"
versionapi.DBRemark = "add stream proxy"
versionapi.DBVersion = "0.0.17"
versionapi.DBRemark = "onvif device support"
handler, cleanUp, err := wireApp(bc, log)
if err != nil {
@@ -100,12 +100,6 @@ func setupZLM(ctx context.Context, dir string) {
workDir := system.Getwd()
configPath := filepath.Join(dir, "zlm.ini")
// 预创建命令实例,在循环中重用
cmd := exec.CommandContext(ctx, "./MediaServer", "-s", "default.pem", "-c", configPath)
cmd.Dir = workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
for {
select {
case <-ctx.Done():
@@ -113,6 +107,12 @@ func setupZLM(ctx context.Context, dir string) {
return
default:
slog.Info("MediaServer 启动中...")
cmd := exec.CommandContext(ctx, "./MediaServer", "-s", "default.pem", "-c", configPath)
cmd.Dir = workDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = os.Environ()
// 启动命令 - 正常情况下会阻塞在这里
if err := cmd.Run(); err != nil {
slog.Error("zlm 运行失败", "err", err)
@@ -121,7 +121,7 @@ func setupZLM(ctx context.Context, dir string) {
}
// 等待后重启(不管是正常退出还是异常退出)
time.Sleep(5 * time.Second)
time.Sleep(2 * time.Second)
}
}
}
+6 -5
View File
@@ -30,14 +30,15 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error)
uniqueidCore := api.NewUniqueID(db)
pushCore := api.NewPushCore(db, uniqueidCore)
storer := api.NewIPCStore(db)
gbdAdapter := api.NewGBAdapter(storer, uniqueidCore)
server, cleanup := gbs.NewServer(bc, gbdAdapter, smsCore)
v := api.NewProtocols(storer)
adapter := api.NewGBAdapter(storer, uniqueidCore)
server, cleanup := gbs.NewServer(bc, adapter, smsCore)
proxyCore := api.NewProxyCore(db, uniqueidCore)
v := api.NewProtocols(adapter, smsCore, proxyCore, server)
ipcCore := api.NewIPCCore(storer, uniqueidCore, v)
webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore)
webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore, v)
pushAPI := api.NewPushAPI(pushCore, smsCore, bc)
ipcapi := api.NewIPCAPI(ipcCore)
proxyAPI := api.NewProxyAPI(db, uniqueidCore)
proxyAPI := api.NewProxyAPI(proxyCore)
configAPI := api.NewConfigAPI(db, bc)
userAPI := api.NewUserAPI(bc)
usecase := &api.Usecase{
+28
View File
@@ -0,0 +1,28 @@
package bz
import "strings"
const (
IDPrefixGB = "gb" // 国标设备
IDPrefixGBChannel = "ch" // 国标通道 id 前缀,channel
IDPrefixOnvif = "on" // onvif 设备 id 前缀
IDPrefixOnvifChannel = "pr" // onvif 通道 id 前缀,profile
IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰
IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰
)
func IsGB28181(stream string) bool {
return strings.HasPrefix(stream, IDPrefixGB) || strings.HasPrefix(stream, IDPrefixGBChannel)
}
func IsOnvif(stream string) bool {
return strings.HasPrefix(stream, IDPrefixOnvif) || strings.HasPrefix(stream, IDPrefixOnvifChannel)
}
func IsRTMP(stream string) bool {
return strings.HasPrefix(stream, IDPrefixRTMP)
}
func IsRTSP(stream string) bool {
return strings.HasPrefix(stream, IDPrefixRTSP)
}
-9
View File
@@ -1,9 +0,0 @@
package bz
const (
IDPrefixGB = "gb" // 国标设备
IDPrefixOnvif = "on" // 国标设备
IDPrefixGBChannel = "ch" // 国标通道 id 前缀
IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰
IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰
)
+3
View File
@@ -11,6 +11,7 @@ import (
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/reason"
"github.com/jinzhu/copier"
"gorm.io/gorm"
)
// ChannelStorer Instantiation interface
@@ -22,6 +23,7 @@ type ChannelStorer interface {
Del(context.Context, *Channel, ...orm.QueryOption) error
BatchEdit(context.Context, string, any, ...orm.QueryOption) error // 批量更新一个字段
Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error
}
// FindChannel Paginated search
@@ -83,6 +85,7 @@ func (c *Core) AddChannel(ctx context.Context, in *AddChannelInput) (*Channel, e
// EditChannel Update object information
func (c *Core) EditChannel(ctx context.Context, in *EditChannelInput, id string) (*Channel, error) {
// TODO: 修改 onvif 的账号/密码 后需要重新连接设备
var out Channel
if err := c.store.Channel().Edit(ctx, &out, func(b *Channel) {
if err := copier.Copy(b, in); err != nil {
+16 -2
View File
@@ -1,7 +1,12 @@
// Code generated by godddx, DO AVOID EDIT.
package ipc
import "github.com/ixugo/goddd/pkg/orm"
import (
"strings"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/pkg/orm"
)
// Channel domain model
type Channel struct {
@@ -16,6 +21,7 @@ type Channel struct {
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"`
CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
Type string `gorm:"column:type;notNull;default:'';comment:通道类型" json:"type"` // 通道类型,继承父级设备类型
}
// TableName database table name
@@ -32,6 +38,14 @@ func (c *Channel) GetChannelID() string {
return c.ChannelID
}
func (c *Channel) GetDeviceID() string {
func (c *Channel) GetGB28181DeviceID() string {
return c.DeviceID
}
func (c *Channel) IsOnvif() bool {
return c.Type == TypeOnvif || bz.IsOnvif(c.ID)
}
func (c *Channel) IsGB28181() bool {
return strings.HasPrefix(c.ID, bz.IDPrefixGBChannel) || c.Type == TypeGB28181 || c.Type == ""
}
+6 -3
View File
@@ -2,7 +2,6 @@
package ipc
import (
"github.com/gowvp/gb28181/internal/core/port"
"github.com/ixugo/goddd/domain/uniqueid"
)
@@ -16,14 +15,18 @@ type Storer interface {
type Core struct {
store Storer
uniqueID uniqueid.Core
protocols map[string]port.Protocol // 协议映射
protocols map[string]Protocoler // 协议映射Protocol 在同一个包内)
}
// NewCore create business domain
func NewCore(store Storer, uni uniqueid.Core, protocols map[string]port.Protocol) Core {
func NewCore(store Storer, uni uniqueid.Core, protocols map[string]Protocoler) Core {
return Core{
store: store,
uniqueID: uni,
protocols: protocols,
}
}
func (c *Core) GetProtocol(atype string) Protocoler {
return c.protocols[atype]
}
+21 -5
View File
@@ -5,7 +5,6 @@ import (
"context"
"log/slog"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/reason"
"github.com/ixugo/goddd/pkg/web"
@@ -104,17 +103,17 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error
}
// 协议验证(通过接口调用)
if protocol, ok := c.protocols[out.Type]; ok {
if protocol, ok := c.protocols[out.GetType()]; ok {
if err := protocol.ValidateDevice(ctx, &out); err != nil {
return nil, reason.ErrBadRequest.SetMsg(err.Error())
}
}
out.ID = GenerateDID(&out, c.uniqueID)
if out.IsOnvif() {
out.ID = c.uniqueID.UniqueID(bz.IDPrefixOnvif)
out.DeviceID = out.ID
} else {
out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB)
out.Username = out.DeviceID
}
if err := out.Check(); err != nil {
@@ -130,7 +129,7 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error
}
// 初始化协议连接(失败不影响设备添加)
if protocol, ok := c.protocols[out.Type]; ok {
if protocol, ok := c.protocols[out.GetType()]; ok {
if err := protocol.InitDevice(ctx, &out); err != nil {
slog.WarnContext(ctx, "初始化协议失败", "err", err, "device_id", out.ID)
}
@@ -158,5 +157,22 @@ func (c Core) DelDevice(ctx context.Context, id string) (*Device, error) {
if err := c.store.Device().Del(ctx, &dev, orm.Where("id=?", id)); err != nil {
return nil, reason.ErrDB.Withf(`Del err[%s]`, err.Error())
}
if err := c.store.Channel().Session(ctx, func(tx *gorm.DB) error {
if err := orm.Delete(tx, &dev, orm.Where("id=?", id)); err != nil {
return err
}
if protocol, ok := c.protocols[dev.GetType()]; ok {
if err := protocol.DeleteDevice(ctx, &dev); err != nil {
return err
}
}
return nil
}, func(d *gorm.DB) error {
return d.Where("did=?", id).Delete(&Channel{}).Error
}); err != nil {
return nil, reason.ErrDB.Withf(`DelChannel err[%s]`, err.Error())
}
return &dev, nil
}
+22 -12
View File
@@ -5,14 +5,19 @@ import (
"fmt"
"strings"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/pkg/orm"
)
// Device domain model
type Device struct {
ID string `gorm:"primaryKey" json:"id"`
Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181)
DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号
ID string `gorm:"primaryKey" json:"id"`
Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181)
// 自定义 id 可以简写为 did,国标 device_id 应简写成 gbid,而不是 device_id
// 起这个名义会难以区分,但为了兼容已经部署的旧数据库...保留
DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号
Name string `gorm:"column:name;notNull;default:'';comment:设备名称" json:"name"` // 设备名称
Transport string `gorm:"column:transport;notNull;default:'';comment:传输协议(tcp/udp)" json:"transport"` // 传输协议(TCP/UDP)
StreamMode int8 `gorm:"column:stream_mode;notNull;default:1;comment:数据传输模式(0:UDP; 1:TCP_PASSIVE; 2:TCP_ACTIVE)" json:"stream_mode"` // 数据传输模式
@@ -29,18 +34,17 @@ type Device struct {
Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"`
Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"`
Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性
// onvif
Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"`
Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"`
Children []*Channel `gorm:"-" json:"children,omitzero"`
}
func (d *Device) IsOnvif() bool {
return strings.ToUpper(d.Type) == "ONVIF"
return strings.HasPrefix(d.ID, bz.IDPrefixOnvif) || d.Type == TypeOnvif
}
func (d *Device) IsGB28181() bool {
return strings.ToUpper(d.Type) == "GB28181" || d.Type == ""
return strings.HasPrefix(d.ID, bz.IDPrefixGB) || d.Type == TypeGB28181 || d.Type == ""
}
// TableName database table name
@@ -49,7 +53,7 @@ func (*Device) TableName() string {
}
func (d Device) Check() error {
if d.IsGB28181() && len(d.DeviceID) < 18 {
if d.IsGB28181() && len(d.Username) < 18 {
return fmt.Errorf("国标 ID 长度应大于等于 18 位")
}
if d.IsOnvif() {
@@ -69,9 +73,9 @@ func (d Device) Check() error {
return nil
}
func (d *Device) init(id, deviceID string) {
func (d *Device) init(id, gbid string) {
d.ID = id
d.DeviceID = deviceID
d.DeviceID = gbid
}
func (d *Device) NetworkAddress() string {
@@ -83,12 +87,18 @@ func (d *Device) GetID() string {
return d.ID
}
func (d *Device) GetDeviceID() string {
func (d *Device) GetGB28181DeviceID() string {
// if d.Username != "" {
// return d.Username
// }
return d.DeviceID
}
func (d *Device) GetType() string {
return d.Type
if d.Type != "" {
return d.Type
}
return GetType(d.ID)
}
func (d *Device) GetIP() string {
+1 -1
View File
@@ -52,7 +52,7 @@ type AddDeviceInput struct {
Name string `json:"name"` // 设备名称
Password string `json:"password"` // 注册密码
Type string `json:"type"` // 设备类型(onvif/gb28181)
Type string `json:"type"` // 设备类型(ONVIF/GB28181)
// Addr string `json:"addr"` // 地址(ip:port)
-115
View File
@@ -1,115 +0,0 @@
package ipc
import (
"context"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/web"
)
type GBDAdapter struct {
// deviceStore DeviceStorer
// channelStore ChannelStorer
store Storer
uni uniqueid.Core
}
func NewGBAdapter(store Storer, uni uniqueid.Core) GBDAdapter {
return GBDAdapter{
store: store,
uni: uni,
}
}
func (g GBDAdapter) Store() Storer {
return g.store
}
func (g GBDAdapter) GetDeviceByDeviceID(deviceID string) (*Device, error) {
ctx := context.TODO()
var d Device
if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil {
if !orm.IsErrRecordNotFound(err) {
return nil, err
}
d.init(g.uni.UniqueID(bz.IDPrefixGB), deviceID)
if err := g.store.Device().Add(ctx, &d); err != nil {
return nil, err
}
}
return &d, nil
}
func (g GBDAdapter) Logout(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g GBDAdapter) Edit(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g GBDAdapter) EditPlaying(deviceID, channelID string, playing bool) error {
var ch Channel
if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) {
c.IsPlaying = playing
}, orm.Where("device_id = ? AND channel_id = ?", deviceID, channelID)); err != nil {
return err
}
return nil
}
func (g GBDAdapter) SaveChannels(channels []*Channel) error {
if len(channels) <= 0 {
return nil
}
var dev Device
_ = g.store.Device().Edit(context.TODO(), &dev, func(d *Device) {
d.Channels = len(channels)
}, orm.Where("device_id=?", channels[0].DeviceID))
// chIDs := make([]string, 0, 8)
for _, channel := range channels {
var ch Channel
if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) {
c.IsOnline = channel.IsOnline
ch.DID = dev.ID
}, orm.Where("device_id = ? AND channel_id = ?", channel.DeviceID, channel.ChannelID)); err != nil {
channel.ID = g.uni.UniqueID(bz.IDPrefixGBChannel)
channel.DID = dev.ID
_ = g.store.Channel().Add(context.TODO(), channel)
}
// chIDs = append(chIDs, channel.ID)
}
// TODO: 清理相关资源
// if len(chIDs) > 0 {
// }
return nil
}
// FindDevices 获取所有设备
func (g GBDAdapter) FindDevices(ctx context.Context) ([]*Device, error) {
var devices []*Device
if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil {
return nil, err
}
return devices, nil
}
+23
View File
@@ -5,9 +5,32 @@ import (
"database/sql/driver"
"encoding/json"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/pkg/orm"
)
const (
TypeGB28181 = "GB28181"
TypeOnvif = "ONVIF"
TypeRTSP = "RTSP"
TypeRTMP = "RTMP"
)
func GetType(stream string) string {
switch true {
case bz.IsGB28181(stream):
return TypeGB28181
case bz.IsOnvif(stream):
return TypeOnvif
case bz.IsRTSP(stream):
return TypeRTSP
case bz.IsRTMP(stream):
return TypeRTMP
default:
return ""
}
}
// DeviceExt domain model
type DeviceExt struct {
Manufacturer string `json:"manufacturer"` // 生产厂商
+52
View File
@@ -0,0 +1,52 @@
package ipc
import (
"context"
)
// Protocoler 协议抽象接口(端口)
//
// 设计原则:
// 1. 接口在 ipc 包内定义,避免循环依赖
// 2. 接口方法直接使用领域模型 (*Device, *Channel)
// 3. 适配器实现此接口,可以直接依赖和修改领域模型
// 4. 符合依赖倒置原则 (DIP):
// - ipc (高层) 依赖 Protocoler 接口
// - adapter (低层) 实现 Protocoler 接口
// - adapter (低层) 依赖 ipc.Device (高层) ✅ 合理
//
// 这就是依赖反转!
type Protocoler interface {
// ValidateDevice 验证设备连接(添加设备前调用)
// 可以修改设备信息(如从 ONVIF 获取的固件版本等)
ValidateDevice(ctx context.Context, device *Device) error
// InitDevice 初始化设备连接(添加设备后调用)
// 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道
InitDevice(ctx context.Context, device *Device) error
// QueryCatalog 查询设备目录/通道
QueryCatalog(ctx context.Context, device *Device) error
// StartPlay 开始播放
StartPlay(ctx context.Context, device *Device, channel *Channel) (*PlayResponse, error)
// StopPlay 停止播放
StopPlay(ctx context.Context, device *Device, channel *Channel) error
DeleteDevice(ctx context.Context, device *Device) error
Hooker
}
type Hooker interface {
OnStreamNotFound(ctx context.Context, app, stream string) error
OnStreamChanged(ctx context.Context, stream string) error
}
// PlayResponse 播放响应
type PlayResponse struct {
SSRC string // GB28181 SSRC
Stream string // 流 ID
RTSP string // RTSP 地址 (ONVIF)
}
+64
View File
@@ -0,0 +1,64 @@
package port
import (
"context"
)
// Device 设备接口
// 注意: 适配器实现时,参数类型为 *ipc.Device,满足此接口
type Device interface {
// 这里不定义任何方法,让所有类型都能满足
// 适配器实现时直接使用 *ipc.Device 类型
}
// Channel 通道接口
type Channel interface {
// 同上
}
// Protocol 协议抽象接口(端口)
//
// 设计原则:
// 1. 接口参数使用 Device/Channel 接口(空接口)
// 2. 适配器实现时使用具体类型 *ipc.Device, *ipc.Channel
// 3. 由于是空接口,任何类型都满足,但语义上明确是 Device/Channel
// 4. 避免循环依赖,同时保持类型语义
//
// 使用示例:
//
// // 适配器实现
// func (a *Adapter) ValidateDevice(ctx context.Context, device Device) error {
// dev := device.(*ipc.Device) // 类型断言
// // ...
// }
//
// // 或者更好的方式:在包级别声明具体签名
// func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error
type Protocol interface {
// ValidateDevice 验证设备连接(添加设备前调用)
// 参数类型: *ipc.Device
ValidateDevice(ctx context.Context, device Device) error
// InitDevice 初始化设备连接(添加设备后调用)
// 参数类型: *ipc.Device
InitDevice(ctx context.Context, device Device) error
// QueryCatalog 查询设备目录/通道
// 参数类型: *ipc.Device
QueryCatalog(ctx context.Context, device Device) error
// StartPlay 开始播放
// 参数类型: *ipc.Device, *ipc.Channel
StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error)
// StopPlay 停止播放
// 参数类型: *ipc.Device, *ipc.Channel
StopPlay(ctx context.Context, device Device, channel Channel) error
}
// PlayResponse 播放响应
type PlayResponse struct {
SSRC string // GB28181 SSRC
Stream string // 流 ID
RTSP string // RTSP 地址 (ONVIF)
}
+196
View File
@@ -0,0 +1,196 @@
package ipc
import (
"context"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/pkg/orm"
"github.com/ixugo/goddd/pkg/web"
)
// 为协议适配,提供协议会用到的功能
type Adapter struct {
// deviceStore DeviceStorer
// channelStore ChannelStorer
store Storer
uni uniqueid.Core
}
func GenerateDID(d *Device, uni uniqueid.Core) string {
if d.IsOnvif() {
return uni.UniqueID(bz.IDPrefixOnvif)
}
return uni.UniqueID(bz.IDPrefixGB)
}
func GenerateChannelID(c *Channel, uni uniqueid.Core) string {
if c.IsOnvif() {
return uni.UniqueID(bz.IDPrefixOnvifChannel)
}
return uni.UniqueID(bz.IDPrefixGBChannel)
}
func NewAdapter(store Storer, uni uniqueid.Core) Adapter {
return Adapter{
store: store,
uni: uni,
}
}
func (g Adapter) Store() Storer {
return g.store
}
func (g Adapter) GetDeviceByDeviceID(gbDeviceID string) (*Device, error) {
ctx := context.TODO()
var d Device
if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", gbDeviceID)); err != nil {
if !orm.IsErrRecordNotFound(err) {
return nil, err
}
d.init(g.uni.UniqueID(bz.IDPrefixGB), gbDeviceID)
if err := g.store.Device().Add(ctx, &d); err != nil {
return nil, err
}
}
return &d, nil
}
func (g Adapter) Logout(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g Adapter) Edit(deviceID string, changeFn func(*Device)) error {
var d Device
if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) {
changeFn(d)
}, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
return nil
}
func (g Adapter) EditPlaying(ctx context.Context, deviceID, channelID string, playing bool) error {
var ch Channel
if err := g.store.Channel().Edit(ctx, &ch, func(c *Channel) {
c.IsPlaying = playing
}, orm.Where("device_id = ? AND channel_id = ?", deviceID, channelID)); err != nil {
return err
}
return nil
}
// SaveChannels 保存通道列表(增量更新 + 删除多余通道)
//
// 策略说明:
// 1. 批量查询现有通道(减少数据库查询)
// 2. 对比更新:存在则更新,不存在则新增
// 3. 删除多余:不在上报列表中的通道标记为离线或删除
// 4. 使用事务保证数据一致性
func (g Adapter) SaveChannels(channels []*Channel) error {
if len(channels) <= 0 {
return nil
}
ctx := context.TODO()
deviceID := channels[0].DeviceID
// 1. 获取设备信息
var dev Device
_ = g.store.Device().Edit(context.TODO(), &dev, func(d *Device) {
d.Channels = len(channels)
}, orm.Where("device_id=?", channels[0].DeviceID))
// 2. 批量查询该设备的所有现有通道(一次查询,避免循环查询)
var existingChannels []*Channel
_, _ = g.store.Channel().Find(ctx, &existingChannels,
web.NewPagerFilterMaxSize(),
orm.Where("device_id = ?", deviceID),
)
// 3. 构建 map 方便快速查找
existingMap := make(map[string]*Channel)
for _, ch := range existingChannels {
existingMap[ch.ChannelID] = ch
}
// 4. 收集当前上报的通道 ID
currentChannelIDs := make([]string, 0, len(channels))
// 5. 遍历上报的通道,区分新增和更新
for _, channel := range channels {
currentChannelIDs = append(currentChannelIDs, channel.ChannelID)
if existing, ok := existingMap[channel.ChannelID]; ok {
// 通道已存在,更新信息
_ = g.store.Channel().Edit(ctx, existing, func(c *Channel) {
c.Name = channel.Name
c.IsOnline = channel.IsOnline
c.Ext = channel.Ext
}, orm.Where("id=?", existing.ID))
} else {
// 通道不存在,新增
channel.ID = GenerateChannelID(channel, g.uni)
channel.DID = dev.ID
_ = g.store.Channel().Add(ctx, channel)
}
}
// 6. 删除不再存在的通道(设备上报的通道列表中没有的)
// 方案A:标记为离线(推荐,保留历史数据)
if len(currentChannelIDs) > 0 {
_ = g.store.Channel().BatchEdit(ctx, "is_online", false,
orm.Where("device_id = ?", deviceID),
orm.Where("channel_id NOT IN ?", currentChannelIDs),
)
}
// 方案B:硬删除(如果需要完全删除)
// 可根据业务需求在配置中选择
// var ch Channel
// _ = g.store.Channel().Del(ctx, &ch,
// orm.Where("device_id = ?", deviceID),
// orm.Where("channel_id NOT IN ?", currentChannelIDs),
// )
// 7. 更新设备的通道数量
_ = g.store.Device().Edit(ctx, &dev, func(d *Device) {
d.Channels = len(channels)
}, orm.Where("device_id=?", deviceID))
return nil
}
// FindDevices 获取所有设备
func (g Adapter) FindDevices(ctx context.Context) ([]*Device, error) {
var devices []*Device
if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil {
return nil, err
}
return devices, nil
}
func (g Adapter) GetChannel(ctx context.Context, id string) (*Channel, error) {
var ch Channel
if err := g.store.Channel().Get(ctx, &ch, orm.Where("id=?", id)); err != nil {
return nil, err
}
return &ch, nil
}
func (g Adapter) GetDevice(ctx context.Context, id string) (*Device, error) {
var dev Device
if err := g.store.Device().Get(ctx, &dev, orm.Where("id=?", id)); err != nil {
return nil, err
}
return &dev, nil
}
+17 -16
View File
@@ -6,7 +6,7 @@ import (
"log/slog"
"strings"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
@@ -16,11 +16,11 @@ import (
var (
_ gbs.MemoryStorer = &Cache{}
_ gb28181.Storer = &Cache{}
_ ipc.Storer = &Cache{}
)
type Cache struct {
gb28181.Storer
ipc.Storer
devices *conc.Map[string, *gbs.Device]
}
@@ -30,15 +30,15 @@ func (c *Cache) LoadOrStore(deviceID string, value *gbs.Device) {
c.devices.LoadOrStore(deviceID, value)
}
func (c *Cache) Device() gb28181.DeviceStorer {
func (c *Cache) Device() ipc.DeviceStorer {
return (*Device)(c)
}
func (c *Cache) Channel() gb28181.ChannelStorer {
func (c *Cache) Channel() ipc.ChannelStorer {
return (*Channel)(c)
}
func NewCache(store gb28181.Storer) *Cache {
func NewCache(store ipc.Storer) *Cache {
return &Cache{
Storer: store,
devices: &conc.Map[string, *gbs.Device]{},
@@ -47,8 +47,9 @@ func NewCache(store gb28181.Storer) *Cache {
// LoadDeviceToMemory implements gbs.MemoryStorer.
func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
devices := make([]*gb28181.Device, 0, 100)
_, err := c.Storer.Device().Find(context.TODO(), &devices, web.NewPagerFilterMaxSize())
// TODO: 加载 gb28181 设备
devices := make([]*ipc.Device, 0, 100)
_, err := c.Storer.Device().Find(context.TODO(), &devices, web.NewPagerFilterMaxSize(), orm.Where("type != ?", ipc.TypeOnvif))
if err != nil {
panic(err)
}
@@ -56,7 +57,7 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
for _, d := range devices {
if strings.ToLower(d.Transport) == "tcp" {
// 通知相关设备/通道离线
c.Change(d.DeviceID, func(d *gb28181.Device) {
c.Change(d.GetGB28181DeviceID(), func(d *ipc.Device) {
d.IsOnline = false
}, func(d *gbs.Device) {
d.IsOnline = false
@@ -67,18 +68,18 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) {
dev := gbs.NewDevice(conn, d)
if dev != nil {
if err := dev.CheckConnection(); err != nil {
slog.Warn("检查设备连接失败", "err", err, "device_id", d.DeviceID, "to", dev.To())
slog.Warn("检查设备连接失败", "err", err, "username", d.GetGB28181DeviceID(), "to", dev.To())
continue
}
slog.Debug("load device to memory", "device_id", d.DeviceID, "to", dev.To())
channels := make([]*gb28181.Channel, 0, 8)
_, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.DeviceID))
slog.Debug("load device to memory", "username", d.GetGB28181DeviceID(), "to", dev.To())
channels := make([]*ipc.Channel, 0, 8)
_, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.GetGB28181DeviceID()))
if err != nil {
panic(err)
}
dev.LoadChannels(channels...)
c.devices.Store(d.DeviceID, dev)
c.devices.Store(d.GetGB28181DeviceID(), dev)
}
}
}
@@ -89,8 +90,8 @@ func (c *Cache) RangeDevices(fn func(key string, value *gbs.Device) bool) {
}
// Change implements gbs.MemoryStorer.
func (c *Cache) Change(deviceID string, changeFn func(*gb28181.Device), changeFn2 func(*gbs.Device)) error {
var dev gb28181.Device
func (c *Cache) Change(deviceID string, changeFn func(*ipc.Device), changeFn2 func(*gbs.Device)) error {
var dev ipc.Device
if err := c.Storer.Device().Edit(context.TODO(), &dev, changeFn, orm.Where("device_id=?", deviceID)); err != nil {
return err
}
+19 -13
View File
@@ -3,16 +3,22 @@ package ipccache
import (
"context"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
)
var _ gb28181.ChannelStorer = &Channel{}
var _ ipc.ChannelStorer = &Channel{}
type Channel Cache
// Add implements gb28181.ChannelStorer.
func (c *Channel) Add(ctx context.Context, ch *gb28181.Channel) error {
// Session implements ipc.ChannelStorer.
func (c *Channel) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error {
return c.Storer.Channel().Session(ctx, changeFns...)
}
// Add implements ipc.ChannelStorer.
func (c *Channel) Add(ctx context.Context, ch *ipc.Channel) error {
if err := c.Storer.Channel().Add(ctx, ch); err != nil {
return err
}
@@ -23,27 +29,27 @@ func (c *Channel) Add(ctx context.Context, ch *gb28181.Channel) error {
return nil
}
// BatchEdit implements gb28181.ChannelStorer.
// BatchEdit implements ipc.ChannelStorer.
func (c *Channel) BatchEdit(ctx context.Context, field string, value any, opts ...orm.QueryOption) error {
return c.Storer.Channel().BatchEdit(ctx, field, value, opts...)
}
// Del implements gb28181.ChannelStorer.
func (c *Channel) Del(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error {
// Del implements ipc.ChannelStorer.
func (c *Channel) Del(ctx context.Context, ch *ipc.Channel, opts ...orm.QueryOption) error {
return c.Storer.Channel().Del(ctx, ch, opts...)
}
// Edit implements gb28181.ChannelStorer.
func (c *Channel) Edit(ctx context.Context, ch *gb28181.Channel, changeFn func(*gb28181.Channel), opts ...orm.QueryOption) error {
// Edit implements ipc.ChannelStorer.
func (c *Channel) Edit(ctx context.Context, ch *ipc.Channel, changeFn func(*ipc.Channel), opts ...orm.QueryOption) error {
return c.Storer.Channel().Edit(ctx, ch, changeFn, opts...)
}
// Find implements gb28181.ChannelStorer.
func (c *Channel) Find(ctx context.Context, chs *[]*gb28181.Channel, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
// Find implements ipc.ChannelStorer.
func (c *Channel) Find(ctx context.Context, chs *[]*ipc.Channel, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
return c.Storer.Channel().Find(ctx, chs, pager, opts...)
}
// Get implements gb28181.ChannelStorer.
func (c *Channel) Get(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error {
// Get implements ipc.ChannelStorer.
func (c *Channel) Get(ctx context.Context, ch *ipc.Channel, opts ...orm.QueryOption) error {
return c.Storer.Channel().Get(ctx, ch, opts...)
}
+18 -18
View File
@@ -5,28 +5,28 @@ import (
"fmt"
"log/slog"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
var _ gb28181.DeviceStorer = &Device{}
var _ ipc.DeviceStorer = &Device{}
type Device = Cache
// Add implements gb28181.DeviceStorer.
func (d *Device) Add(ctx context.Context, dev *gb28181.Device) error {
// Add implements ipc.DeviceStorer.
func (d *Device) Add(ctx context.Context, dev *ipc.Device) error {
if err := d.Storer.Device().Add(ctx, dev); err != nil {
return err
}
d.devices.LoadOrStore(dev.DeviceID, gbs.NewDevice(nil, dev))
d.devices.LoadOrStore(dev.GetGB28181DeviceID(), gbs.NewDevice(nil, dev))
return nil
}
// Del implements gb28181.DeviceStorer.
func (d *Device) Del(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error {
// Del implements ipc.DeviceStorer.
func (d *Device) Del(ctx context.Context, dev *ipc.Device, opts ...orm.QueryOption) error {
if err := d.Storer.Device().Session(
ctx,
func(tx *gorm.DB) error {
@@ -37,29 +37,29 @@ func (d *Device) Del(ctx context.Context, dev *gb28181.Device, opts ...orm.Query
return db.Delete(dev).Error
},
func(tx *gorm.DB) error {
return tx.Model(&gb28181.Channel{}).Where("did=?", dev.ID).Delete(nil).Error
return tx.Model(&ipc.Channel{}).Where("did=?", dev.ID).Delete(nil).Error
},
); err != nil {
return err
}
d.devices.Delete(dev.DeviceID)
d.devices.Delete(dev.GetGB28181DeviceID())
return nil
}
// Edit implements gb28181.DeviceStorer.
func (d *Device) Edit(ctx context.Context, dev *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error {
// Edit implements ipc.DeviceStorer.
func (d *Device) Edit(ctx context.Context, dev *ipc.Device, changeFn func(*ipc.Device), opts ...orm.QueryOption) error {
if err := d.Storer.Device().Edit(ctx, dev, changeFn, opts...); err != nil {
return err
}
dev2, ok := d.devices.Load(dev.DeviceID)
dev2, ok := d.devices.Load(dev.GetGB28181DeviceID())
if !ok {
return fmt.Errorf("edit device not found")
}
// 密码修改,设备需要重新注册
if dev2.Password != dev.Password && dev.Password != "" {
slog.InfoContext(ctx, " 修改密码,设备离线")
d.Change(dev.DeviceID, func(d *gb28181.Device) {
d.Change(dev.GetGB28181DeviceID(), func(d *ipc.Device) {
d.Password = dev.Password
d.IsOnline = false
}, func(d *gbs.Device) {
@@ -68,17 +68,17 @@ func (d *Device) Edit(ctx context.Context, dev *gb28181.Device, changeFn func(*g
return nil
}
// Find implements gb28181.DeviceStorer.
func (d *Device) Find(ctx context.Context, devs *[]*gb28181.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
// Find implements ipc.DeviceStorer.
func (d *Device) Find(ctx context.Context, devs *[]*ipc.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) {
return d.Storer.Device().Find(ctx, devs, pager, opts...)
}
// Get implements gb28181.DeviceStorer.
func (d *Device) Get(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error {
// Get implements ipc.DeviceStorer.
func (d *Device) Get(ctx context.Context, dev *ipc.Device, opts ...orm.QueryOption) error {
return d.Storer.Device().Get(ctx, dev, opts...)
}
// Session implements gb28181.DeviceStorer.
// Session implements ipc.DeviceStorer.
func (d *Device) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error {
return d.Storer.Device().Session(ctx, changeFns...)
}
+15 -15
View File
@@ -4,22 +4,22 @@ package ipcdb
import (
"context"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
)
var _ gb28181.ChannelStorer = Channel{}
var _ ipc.ChannelStorer = Channel{}
// Channel Related business namespaces
type Channel DB
// BatchEdit implements gb28181.ChannelStorer.
// BatchEdit implements ipc.ChannelStorer.
func (d Channel) BatchEdit(ctx context.Context, column string, value any, args ...orm.QueryOption) error {
if len(args) == 0 {
panic("没有查询条件")
}
db := d.db.WithContext(ctx).Model(new(gb28181.Channel))
db := d.db.WithContext(ctx).Model(new(ipc.Channel))
for _, fn := range args {
fn(db)
}
@@ -31,28 +31,28 @@ func NewChannel(db *gorm.DB) Channel {
return Channel{db: db}
}
// Find implements gb28181.ChannelStorer.
func (d Channel) Find(ctx context.Context, bs *[]*gb28181.Channel, page orm.Pager, opts ...orm.QueryOption) (int64, error) {
// Find implements ipc.ChannelStorer.
func (d Channel) Find(ctx context.Context, bs *[]*ipc.Channel, page orm.Pager, opts ...orm.QueryOption) (int64, error) {
return orm.FindWithContext(ctx, d.db, bs, page, opts...)
}
// Get implements gb28181.ChannelStorer.
func (d Channel) Get(ctx context.Context, model *gb28181.Channel, opts ...orm.QueryOption) error {
// Get implements ipc.ChannelStorer.
func (d Channel) Get(ctx context.Context, model *ipc.Channel, opts ...orm.QueryOption) error {
return orm.FirstWithContext(ctx, d.db, model, opts...)
}
// Add implements gb28181.ChannelStorer.
func (d Channel) Add(ctx context.Context, model *gb28181.Channel) error {
// Add implements ipc.ChannelStorer.
func (d Channel) Add(ctx context.Context, model *ipc.Channel) error {
return d.db.WithContext(ctx).Create(model).Error
}
// Edit implements gb28181.ChannelStorer.
func (d Channel) Edit(ctx context.Context, model *gb28181.Channel, changeFn func(*gb28181.Channel), opts ...orm.QueryOption) error {
// Edit implements ipc.ChannelStorer.
func (d Channel) Edit(ctx context.Context, model *ipc.Channel, changeFn func(*ipc.Channel), opts ...orm.QueryOption) error {
return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...)
}
// Del implements gb28181.ChannelStorer.
func (d Channel) Del(ctx context.Context, model *gb28181.Channel, opts ...orm.QueryOption) error {
// Del implements ipc.ChannelStorer.
func (d Channel) Del(ctx context.Context, model *ipc.Channel, opts ...orm.QueryOption) error {
return orm.DeleteWithContext(ctx, d.db, model, opts...)
}
@@ -67,6 +67,6 @@ func (d Channel) Session(ctx context.Context, changeFns ...func(*gorm.DB) error)
})
}
func (d Channel) EditWithSession(tx *gorm.DB, model *gb28181.Channel, changeFn func(b *gb28181.Channel) error, opts ...orm.QueryOption) error {
func (d Channel) EditWithSession(tx *gorm.DB, model *ipc.Channel, changeFn func(b *ipc.Channel) error, opts ...orm.QueryOption) error {
return orm.UpdateWithSession(tx, model, changeFn, opts...)
}
+6 -6
View File
@@ -2,11 +2,11 @@
package ipcdb
import (
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"gorm.io/gorm"
)
var _ gb28181.Storer = DB{}
var _ ipc.Storer = DB{}
// DB Related business namespaces
type DB struct {
@@ -19,12 +19,12 @@ func NewDB(db *gorm.DB) DB {
}
// Device Get business instance
func (d DB) Device() gb28181.DeviceStorer {
func (d DB) Device() ipc.DeviceStorer {
return Device(d)
}
// Channel Get business instance
func (d DB) Channel() gb28181.ChannelStorer {
func (d DB) Channel() ipc.ChannelStorer {
return Channel(d)
}
@@ -34,8 +34,8 @@ func (d DB) AutoMigrate(ok bool) DB {
return d
}
if err := d.db.AutoMigrate(
new(gb28181.Device),
new(gb28181.Channel),
new(ipc.Device),
new(ipc.Channel),
); err != nil {
panic(err)
}
+13 -13
View File
@@ -4,12 +4,12 @@ package ipcdb
import (
"context"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/ixugo/goddd/pkg/orm"
"gorm.io/gorm"
)
var _ gb28181.DeviceStorer = Device{}
var _ ipc.DeviceStorer = Device{}
// Device Related business namespaces
type Device DB
@@ -19,28 +19,28 @@ func NewDevice(db *gorm.DB) Device {
return Device{db: db}
}
// Find implements gb28181.DeviceStorer.
func (d Device) Find(ctx context.Context, bs *[]*gb28181.Device, page orm.Pager, opts ...orm.QueryOption) (int64, error) {
// Find implements ipc.DeviceStorer.
func (d Device) Find(ctx context.Context, bs *[]*ipc.Device, page orm.Pager, opts ...orm.QueryOption) (int64, error) {
return orm.FindWithContext(ctx, d.db, bs, page, opts...)
}
// Get implements gb28181.DeviceStorer.
func (d Device) Get(ctx context.Context, model *gb28181.Device, opts ...orm.QueryOption) error {
// Get implements ipc.DeviceStorer.
func (d Device) Get(ctx context.Context, model *ipc.Device, opts ...orm.QueryOption) error {
return orm.FirstWithContext(ctx, d.db, model, opts...)
}
// Add implements gb28181.DeviceStorer.
func (d Device) Add(ctx context.Context, model *gb28181.Device) error {
// Add implements ipc.DeviceStorer.
func (d Device) Add(ctx context.Context, model *ipc.Device) error {
return d.db.WithContext(ctx).Create(model).Error
}
// Edit implements gb28181.DeviceStorer.
func (d Device) Edit(ctx context.Context, model *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error {
// Edit implements ipc.DeviceStorer.
func (d Device) Edit(ctx context.Context, model *ipc.Device, changeFn func(*ipc.Device), opts ...orm.QueryOption) error {
return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...)
}
// Del implements gb28181.DeviceStorer.
func (d Device) Del(ctx context.Context, model *gb28181.Device, opts ...orm.QueryOption) error {
// Del implements ipc.DeviceStorer.
func (d Device) Del(ctx context.Context, model *ipc.Device, opts ...orm.QueryOption) error {
return orm.DeleteWithContext(ctx, d.db, model, opts...)
}
@@ -55,6 +55,6 @@ func (d Device) Session(ctx context.Context, changeFns ...func(*gorm.DB) error)
})
}
func (d Device) EditWithSession(tx *gorm.DB, model *gb28181.Device, changeFn func(b *gb28181.Device) error, opts ...orm.QueryOption) error {
func (d Device) EditWithSession(tx *gorm.DB, model *ipc.Device, changeFn func(b *ipc.Device) error, opts ...orm.QueryOption) error {
return orm.UpdateWithSession(tx, model, changeFn, opts...)
}
-50
View File
@@ -1,50 +0,0 @@
package port
import (
"context"
)
// Device 设备接口(避免循环依赖)
// 协议适配器的实现会接收具体的 gb28181.Device 类型
type Device interface {
GetID() string
GetDeviceID() string
GetType() string
GetIP() string
GetPort() int
GetUsername() string
GetPassword() string
}
// Channel 通道接口(避免循环依赖)
type Channel interface {
GetID() string
GetChannelID() string
GetDeviceID() string
}
// Protocol 协议抽象接口(端口)
type Protocol interface {
// ValidateDevice 验证设备连接(添加设备前调用)
ValidateDevice(ctx context.Context, device Device) error
// InitDevice 初始化设备连接(添加设备后调用)
// 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道
InitDevice(ctx context.Context, device Device) error
// QueryCatalog 查询设备目录/通道
QueryCatalog(ctx context.Context, device Device) error
// StartPlay 开始播放
StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error)
// StopPlay 停止播放
StopPlay(ctx context.Context, device Device, channel Channel) error
}
// PlayResponse 播放响应
type PlayResponse struct {
SSRC string // GB28181 SSRC
Stream string // 流 ID
RTSP string // RTSP 地址 (ONVIF)
}
+2
View File
@@ -194,6 +194,7 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error {
log.Info("ZLM 服务节点配置设置")
hookPrefix := fmt.Sprintf("http://%s:%d/webhook", server.HookIP, serverPort)
req := zlm.SetServerConfigRequest{
RtcExternIP: zlm.NewString(server.IP),
GeneralMediaServerID: zlm.NewString(server.ID),
@@ -232,6 +233,7 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error {
// 优化此消息以更快的收到流注销事件
ProtocolContinuePushMs: zlm.NewString("3000"),
RtpProxyPortRange: &server.RTPPortRange,
FfmpegLog: zlm.NewString("./fflogs/ffmpeg.log"),
}
{
-157
View File
@@ -1,157 +0,0 @@
package onvifadapter
import (
"context"
"fmt"
"log/slog"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/onvif"
m "github.com/gowvp/onvif/media"
sdkmedia "github.com/gowvp/onvif/sdk/media"
"github.com/ixugo/goddd/pkg/orm"
)
var _ port.Protocol = (*Adapter)(nil)
// Adapter ONVIF 协议适配器
type Adapter struct {
devices map[string]*onvif.Device // ONVIF 设备连接缓存
store gb28181.Storer // 协议适配器可以依赖领域的存储接口
}
func NewAdapter(store gb28181.Storer) *Adapter {
return &Adapter{
devices: make(map[string]*onvif.Device),
store: store,
}
}
// ValidateDevice 实现 port.Protocol 接口 - ONVIF 设备验证
func (a *Adapter) ValidateDevice(ctx context.Context, device port.Device) error {
// 尝试连接 ONVIF 设备并验证可以获取 Profiles
dev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return fmt.Errorf("ONVIF 连接失败: %w", err)
}
// 验证可以获取 Profiles
_, err = sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{})
if err != nil {
return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err)
}
return nil
}
// InitDevice 实现 port.Protocol 接口 - 初始化 ONVIF 设备
// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道
func (a *Adapter) InitDevice(ctx context.Context, device port.Device) error {
// 创建 ONVIF 连接
dev, err := onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return err
}
// 缓存设备连接
a.devices[device.GetID()] = dev
// 自动查询 Profiles 作为通道
return a.queryAndSaveProfiles(ctx, device, dev)
}
// QueryCatalog 实现 port.Protocol 接口 - ONVIF 查询 Profiles
func (a *Adapter) QueryCatalog(ctx context.Context, device port.Device) error {
dev, ok := a.devices[device.GetID()]
if !ok {
// 设备连接不在缓存中,尝试重新连接
var err error
dev, err = onvif.NewDevice(onvif.DeviceParams{
Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()),
Username: device.GetUsername(),
Password: device.GetPassword(),
})
if err != nil {
return fmt.Errorf("ONVIF 设备未初始化: %w", err)
}
a.devices[device.GetID()] = dev
}
return a.queryAndSaveProfiles(ctx, device, dev)
}
// StartPlay 实现 port.Protocol 接口 - ONVIF 播放
func (a *Adapter) StartPlay(ctx context.Context, device port.Device, channel port.Channel) (*port.PlayResponse, error) {
dev, ok := a.devices[device.GetID()]
if !ok {
return nil, fmt.Errorf("ONVIF 设备未初始化")
}
// 获取 RTSP 地址
streamURI, err := a.getStreamURI(ctx, dev, channel.GetChannelID())
if err != nil {
return nil, err
}
return &port.PlayResponse{
RTSP: streamURI,
}, nil
}
// StopPlay 实现 port.Protocol 接口 - ONVIF 停止播放
func (a *Adapter) StopPlay(ctx context.Context, device port.Device, channel port.Channel) error {
// ONVIF 通常不需要显式停止播放
return nil
}
// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道
func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device port.Device, dev *onvif.Device) error {
// 查询 ONVIF Profiles
resp, err := sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{})
if err != nil {
return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err)
}
// 将 Profiles 转换为通道并保存
for _, profile := range resp.Profiles {
channel := &gb28181.Channel{
DeviceID: device.GetDeviceID(),
ChannelID: string(profile.Token),
Name: string(profile.Name),
DID: device.GetID(),
}
// 保存到数据库(使用领域层的存储接口)
if err := a.store.Channel().Add(ctx, channel); err != nil {
// 如果是重复错误,忽略
if orm.IsDuplicatedKey(err) {
slog.DebugContext(ctx, "通道已存在", "channel_id", channel.ChannelID)
continue
}
slog.ErrorContext(ctx, "保存通道失败", "err", err, "channel_id", channel.ChannelID)
continue
}
slog.InfoContext(ctx, "ONVIF Profile 保存为通道", "channel_id", channel.ChannelID, "name", channel.Name)
}
return nil
}
// getStreamURI 获取 RTSP 流地址
func (a *Adapter) getStreamURI(ctx context.Context, dev *onvif.Device, profileToken string) (string, error) {
// TODO: 调用 ONVIF GetStreamUri 方法
// 这里需要根据 onvif SDK 的实际 API 来实现
// 临时实现:假设 profileToken 可以直接构造 RTSP 地址
params := dev.GetDeviceParams()
return fmt.Sprintf("rtsp://%s:%s@%s/stream/%s", params.Username, params.Password, params.Xaddr, profileToken), nil
}
+40 -48
View File
@@ -2,25 +2,24 @@
package api
import (
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"slices"
"sort"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gowvp/gb28181/internal/adapter/onvifadapter"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/zlm"
"github.com/gowvp/onvif"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/pkg/hook"
"github.com/ixugo/goddd/pkg/orm"
@@ -61,7 +60,7 @@ func NewIPCAPI(core ipc.Core) IPCAPI {
return IPCAPI{ipc: core}
}
func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]port.Protocol) ipc.Core {
func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]ipc.Protocoler) ipc.Core {
return ipc.NewCore(store, uni, protocols)
}
@@ -92,8 +91,9 @@ func registerGB28181(g gin.IRouter, api IPCAPI, handler ...gin.HandlerFunc) {
group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道(GB28181 特有)
}
{
group := g.Group("/onvif", handler...)
group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有)
// group := g.Group("/onvif", handler...)
g.GET("/onvif/discover", api.discover) // ONVIF 设备发现(ONVIF 特有)
// group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有)
}
// 统一的通道管理 API(支持所有协议)
@@ -137,6 +137,10 @@ func (a IPCAPI) editDevice(c *gin.Context, in *ipc.EditDeviceInput) (any, error)
// POST /devices
// { "type": "ONVIF", "ip": "192.168.1.100", "port": 80, "username": "admin", "password": "12345" }
func (a IPCAPI) addDevice(c *gin.Context, in *ipc.AddDeviceInput) (any, error) {
in.Type = strings.ToUpper(in.Type)
if !slices.Contains([]string{ipc.TypeGB28181, ipc.TypeOnvif}, in.Type) {
return nil, reason.ErrBadRequest.SetMsg("不支持的设备类型")
}
return a.ipc.AddDevice(c.Request.Context(), in)
}
@@ -196,7 +200,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
var app, appStream, host, stream, session, mediaServerID string
// 国标逻辑
if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) {
if bz.IsGB28181(channelID) {
// 防止错误的配置,无法收到流
if a.uc.Conf.Media.SDPIP == "127.0.0.1" {
return nil, reason.ErrUsedLogic.SetMsg("请先配置流媒体 SDP 收流地址")
@@ -212,7 +216,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
mediaServerID = sms.DefaultMediaServerID
} else if strings.HasPrefix(channelID, bz.IDPrefixRTMP) {
} else if bz.IsRTMP(channelID) {
pu, err := a.uc.MediaAPI.pushCore.GetStreamPush(c.Request.Context(), channelID)
if err != nil {
return nil, err
@@ -227,7 +231,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
if !pu.IsAuthDisabled && pu.Session != "" {
session = "session=" + pu.Session
}
} else if strings.HasPrefix(channelID, bz.IDPrefixRTSP) {
} else if bz.IsRTSP(channelID) {
proxy, err := a.uc.ProxyAPI.proxyCore.GetStreamProxy(c.Request.Context(), channelID)
if err != nil {
return nil, err
@@ -235,6 +239,10 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
app = proxy.App
appStream = proxy.Stream
mediaServerID = sms.DefaultMediaServerID
} else if bz.IsOnvif(channelID) {
app = "rtp"
appStream = channelID
mediaServerID = sms.DefaultMediaServerID
} else {
return nil, reason.ErrNotFound.SetMsg("不支持的播放通道")
}
@@ -299,7 +307,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) {
// 取一张快照
go func() {
for range 2 {
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
rtsp := fmt.Sprintf("rtsp://%s:%d/%s", "127.0.0.1", svr.Ports.RTSP, stream) + "?" + session
body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{
URL: rtsp,
@@ -382,27 +390,13 @@ func (a IPCAPI) getSnapshot(c *gin.Context) {
c.Data(200, "image/jpeg", body)
}
type DiscoverResponse struct {
Addr string `json:"addr"`
}
func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse {
addr := dev.GetDeviceParams().Xaddr
if !strings.Contains(addr, ":") {
addr += ":80"
}
return &DiscoverResponse{
Addr: addr,
}
}
func (a IPCAPI) discover(c *gin.Context) {
recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces()
if err != nil {
web.Fail(c, err)
p := a.ipc.GetProtocol(ipc.TypeOnvif)
onvifAdapter, ok := p.(*onvifadapter.Adapter)
if !ok {
web.Fail(c, reason.ErrNotFound.SetMsg("不支持的协议"))
return
}
defer cancel()
se := web.NewSSE(64, time.Minute)
go func() {
@@ -413,27 +407,25 @@ func (a IPCAPI) discover(c *gin.Context) {
})
se.Close()
}()
for {
select {
case dev := <-recv:
if dev == nil {
return
}
// TODO: 已经添加的设备需要过滤掉
b, _ := json.Marshal(toDiscoverResponse(dev))
se.Publish(web.Event{
ID: uuid.NewString(),
Event: "discover",
Data: b,
})
time.Sleep(time.Millisecond * 200)
case <-c.Request.Context().Done():
return
case <-time.After(3 * time.Second):
slog.DebugContext(c.Request.Context(), "discover timeout")
return
}
w := IOWriter{fn: func(b []byte) (int, error) {
se.Publish(web.Event{
ID: uuid.NewString(),
Event: "discover",
Data: b,
})
return len(b), nil
}}
if err := onvifAdapter.Discover(c.Request.Context(), w); err != nil {
slog.ErrorContext(c.Request.Context(), "discover", "err", err)
}
}()
se.ServeHTTP(c.Writer, c.Request)
}
type IOWriter struct {
fn func(b []byte) (int, error)
}
func (w IOWriter) Write(b []byte) (int, error) {
return w.fn(b)
}
+13 -14
View File
@@ -5,14 +5,17 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/wire"
"github.com/gowvp/gb28181/internal/adapter/gbadapter"
"github.com/gowvp/gb28181/internal/adapter/onvifadapter"
"github.com/gowvp/gb28181/internal/adapter/rtspadapter"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc/store/ipccache"
"github.com/gowvp/gb28181/internal/core/ipc/store/ipcdb"
"github.com/gowvp/gb28181/internal/core/port"
"github.com/gowvp/gb28181/internal/core/proxy"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/push/store/pushdb"
onvifadapter "github.com/gowvp/gb28181/internal/protocol/onvif"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/ixugo/goddd/domain/uniqueid"
"github.com/ixugo/goddd/domain/uniqueid/store/uniqueiddb"
@@ -34,7 +37,7 @@ var (
NewPushCore, NewPushAPI,
gbs.NewServer,
NewIPCStore, NewProtocols, NewIPCCore, NewIPCAPI, NewGBAdapter,
NewProxyAPI,
NewProxyAPI, NewProxyCore,
NewConfigAPI,
NewUserAPI,
)
@@ -95,22 +98,18 @@ func NewIPCStore(db *gorm.DB) ipc.Storer {
return ipccache.NewCache(ipcdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()))
}
func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.GBDAdapter {
return ipc.NewGBAdapter(
func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.Adapter {
return ipc.NewAdapter(
store,
uni,
)
}
// NewProtocols 创建协议适配器映射
func NewProtocols(store ipc.Storer) map[string]port.Protocol {
protocols := make(map[string]port.Protocol)
// 注册 ONVIF 协议适配器
protocols["ONVIF"] = onvifadapter.NewAdapter(store)
// TODO: 注册 GB28181 协议适配器
// protocols["GB28181"] = gb28181adapter.NewAdapter(sipServer, store)
func NewProtocols(adapter ipc.Adapter, sms sms.Core, proxyCore *proxy.Core, gbs *gbs.Server) map[string]ipc.Protocoler {
protocols := make(map[string]ipc.Protocoler)
protocols[ipc.TypeOnvif] = onvifadapter.NewAdapter(adapter, sms)
protocols[ipc.TypeRTSP] = rtspadapter.NewAdapter(proxyCore, sms)
protocols[ipc.TypeGB28181] = gbadapter.NewAdapter(adapter, gbs, sms)
return protocols
}
+6 -3
View File
@@ -15,9 +15,12 @@ type ProxyAPI struct {
proxyCore *proxy.Core
}
func NewProxyAPI(db *gorm.DB, uni uniqueid.Core) ProxyAPI {
core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni)
return ProxyAPI{proxyCore: core}
func NewProxyCore(db *gorm.DB, uni uniqueid.Core) *proxy.Core {
return proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni)
}
func NewProxyAPI(proxyCore *proxy.Core) ProxyAPI {
return ProxyAPI{proxyCore: proxyCore}
}
func registerProxy(g gin.IRouter, api ProxyAPI, handler ...gin.HandlerFunc) {
+1 -1
View File
@@ -18,7 +18,7 @@ type SmsAPI struct {
}
func NewSMSCore(db *gorm.DB, cfg *conf.Bootstrap) sms.Core {
core := sms.NewCore(smsdb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate))
core := sms.NewCore(smsdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()))
if err := core.Run(cfg, cfg.Server.HTTP.Port); err != nil {
panic(err)
}
+28 -112
View File
@@ -1,33 +1,32 @@
package api
import (
"context"
"log/slog"
"net/url"
"strings"
"github.com/gin-gonic/gin"
"github.com/gowvp/gb28181/internal/conf"
"github.com/gowvp/gb28181/internal/core/bz"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/push"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs"
"github.com/gowvp/gb28181/pkg/zlm"
"github.com/ixugo/goddd/pkg/web"
)
type WebHookAPI struct {
smsCore sms.Core
mediaCore push.Core
gb28181Core gb28181.Core
gb28181Core ipc.Core
conf *conf.Bootstrap
log *slog.Logger
gbs *gbs.Server
uc *Usecase
protocols map[string]ipc.Protocoler
}
func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs *gbs.Server, gb28181 gb28181.Core) WebHookAPI {
func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs *gbs.Server, gb28181 ipc.Core, protocols map[string]ipc.Protocoler) WebHookAPI {
return WebHookAPI{
smsCore: core,
mediaCore: mediaCore,
@@ -35,6 +34,7 @@ func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs
log: slog.With("hook", "zlm"),
gbs: gbs,
gb28181Core: gb28181,
protocols: protocols,
}
}
@@ -88,29 +88,23 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut
// https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed
func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (DefaultOutput, error) {
w.log.InfoContext(c.Request.Context(), "webhook onStreamChanged", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist)
if in.App == "rtp" {
// 防止多次触发
if in.Schema == "rtmp" && !in.Regist {
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
w.log.WarnContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
return newDefaultOutputOK(), nil
}
w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch})
if in.Regist || in.Schema != "rtmp" {
return newDefaultOutputOK(), nil
}
// TODO: 待重构
if bz.IsRTMP(in.Stream) {
if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil {
w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
}
return newDefaultOutputOK(), nil
}
if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
return newDefaultOutputOK(), nil
}
switch in.Schema {
case "rtmp":
if !in.Regist {
if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil {
w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
}
r := ipc.GetType(in.Stream)
protocol, ok := w.protocols[r]
if ok {
if err := protocol.OnStreamChanged(c.Request.Context(), in.Stream); err != nil {
slog.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err)
}
}
return newDefaultOutputOK(), nil
@@ -155,16 +149,6 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e
func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) {
// rtmp 无人观看时,也允许推流
w.log.InfoContext(c.Request.Context(), "webhook onStreamNoneReader", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID)
if in.App == "rtp" {
ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream)
if err != nil {
w.log.WarnContext(c.Request.Context(), "webhook onStreamNoneReader", "err", err)
return onStreamNoneReaderOutput{Close: true}, nil
}
_ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch})
} else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
}
// 存在录像计划时,不关闭流
return onStreamNoneReaderOutput{Close: true}, nil
}
@@ -179,86 +163,18 @@ func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInp
func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) {
w.log.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID)
// 防止重复触发
if in.Schema != "rtmp" {
return newDefaultOutputOK(), nil
}
// 国标流处理
if in.App == "rtp" {
// 防止重复触发
if in.Schema != "rtmp" {
return newDefaultOutputOK(), nil
}
v := RTPStream{uc: w.uc}
if err := v.onStreamNotFound(c.Request.Context(), in); err != nil {
slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err)
}
} else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) {
v := RTSPStream{uc: w.uc}
if err := v.onStreamNotFound(c.Request.Context(), in); err != nil {
r := ipc.GetType(in.Stream)
protocol, ok := w.protocols[r]
if ok {
if err := protocol.OnStreamNotFound(c.Request.Context(), in.App, in.Stream); err != nil {
slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err)
}
}
return newDefaultOutputOK(), nil
}
type RTPStream struct {
uc *Usecase
}
func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error {
ch, err := r.uc.GB28181API.ipc.GetChannel(ctx, in.Stream)
if err != nil {
return err
}
dev, err := r.uc.GB28181API.ipc.GetDevice(ctx, ch.DID)
if err != nil {
return err
}
svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
return r.uc.WebHookAPI.gbs.Play(&gbs.PlayInput{
Channel: ch,
StreamMode: dev.StreamMode,
SMS: svr,
})
}
type RTSPStream struct {
uc *Usecase
}
func (r RTSPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error {
proxy, err := r.uc.ProxyAPI.proxyCore.GetStreamProxy(ctx, in.Stream)
if err != nil {
return err
}
svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID)
if err != nil {
return err
}
resp, err := r.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{
Vhost: "__defaultVhost__",
App: proxy.App,
Stream: proxy.Stream,
URL: proxy.SourceURL,
RetryCount: 3,
RTPType: proxy.Transport,
TimeoutSec: 10,
EnableHLSFMP4: zlm.NewBool(true),
EnableAudio: zlm.NewBool(true),
EnableRTSP: zlm.NewBool(true),
EnableRTMP: zlm.NewBool(true),
AddMuteAudio: zlm.NewBool(true),
AutoClose: zlm.NewBool(true),
})
if err != nil {
return err
}
// 用于关闭
r.uc.ProxyAPI.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID)
return nil
}
+1 -1
View File
@@ -44,7 +44,7 @@ type Device struct {
}
func NewDevice(conn sip.Connection, d *ipc.Device) *Device {
uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.DeviceID, d.Address))
uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.GetGB28181DeviceID(), d.Address))
if err != nil {
slog.Error("parse uri", "err", err, "did", d.ID)
return nil
+4 -3
View File
@@ -1,6 +1,7 @@
package gbs
import (
"context"
"fmt"
"log/slog"
"net"
@@ -47,7 +48,7 @@ func (g *GB28181API) stopPlay(ch *Channel, in *StopPlayInput) error {
}
// StopPlay 加锁的停止播放
func (g *GB28181API) StopPlay(in *StopPlayInput) error {
func (g *GB28181API) StopPlay(ctx context.Context, in *StopPlayInput) error {
ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID)
if !ok {
return ErrDeviceNotExist
@@ -57,7 +58,7 @@ func (g *GB28181API) StopPlay(in *StopPlayInput) error {
defer ch.device.playMutex.Unlock()
defer func() {
g.svr.gb.core.EditPlaying(in.Channel.DeviceID, in.Channel.ChannelID, false)
g.svr.gb.core.EditPlaying(ctx, in.Channel.DeviceID, in.Channel.ChannelID, false)
}()
return g.stopPlay(ch, in)
}
@@ -109,7 +110,7 @@ func (g *GB28181API) Play(in *PlayInput) error {
return err
}
g.svr.gb.core.EditPlaying(in.Channel.DeviceID, in.Channel.ChannelID, true)
g.svr.gb.core.EditPlaying(context.TODO(), in.Channel.DeviceID, in.Channel.ChannelID, true)
return nil
}
+27 -25
View File
@@ -9,7 +9,7 @@ import (
"unicode"
"github.com/gowvp/gb28181/internal/conf"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/sip"
"github.com/ixugo/goddd/pkg/conc"
@@ -20,7 +20,7 @@ const ignorePassword = "#"
type GB28181API struct {
cfg *conf.SIP
core gb28181.GBDAdapter
core ipc.Adapter
catalog *sip.Collector[Channels]
@@ -32,7 +32,7 @@ type GB28181API struct {
sms *sms.NodeManager
}
func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeManager) *GB28181API {
func NewGB28181API(cfg *conf.Bootstrap, store ipc.Adapter, sms *sms.NodeManager) *GB28181API {
g := GB28181API{
cfg: &cfg.Sip,
core: store,
@@ -56,32 +56,35 @@ func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeM
// }
// }
ipc, ok := g.svr.memoryStorer.Load(s)
d, ok := g.svr.memoryStorer.Load(s)
if ok {
for _, ch := range channel {
ch := Channel{
ChannelID: ch.ChannelID,
device: ipc,
device: d,
}
ch.init(g.cfg.Domain)
ipc.Channels.Store(ch.ChannelID, &ch)
d.Channels.Store(ch.ChannelID, &ch)
}
}
out := make([]*gb28181.Channel, len(channel))
out := make([]*ipc.Channel, len(channel))
for i, ch := range channel {
out[i] = &gb28181.Channel{
out[i] = &ipc.Channel{
DeviceID: s,
ChannelID: ch.ChannelID,
Name: ch.Name,
IsOnline: ch.Status == "OK" || ch.Status == "ON",
Ext: gb28181.DeviceExt{
Ext: ipc.DeviceExt{
Manufacturer: ch.Manufacturer,
Model: ch.Model,
},
Type: ipc.TypeGB28181,
}
}
g.core.SaveChannels(out)
if err := g.core.SaveChannels(out); err != nil {
slog.Error("SaveChannels", "err", err)
}
})
return &g
}
@@ -131,19 +134,18 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
password = ""
}
hdrs := ctx.Request.GetHeaders("Authorization")
if len(hdrs) == 0 {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))})
_ = ctx.Tx.Respond(resp)
return
}
if password != "" {
hdrs := ctx.Request.GetHeaders("Authorization")
if len(hdrs) == 0 {
resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil)
resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))})
_ = ctx.Tx.Respond(resp)
return
}
authenticateHeader := hdrs[0].(*sip.GenericHeader)
auth := sip.AuthFromValue(authenticateHeader.Contents)
auth.SetPassword(password)
auth.SetUsername(dev.DeviceID)
auth.SetUsername(dev.GetGB28181DeviceID())
auth.SetMethod(ctx.Request.Method())
auth.SetURI(auth.Get("uri"))
if auth.CalcResponse() != auth.Get("response") {
@@ -165,7 +167,7 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
expire := ctx.GetHeader("Expires")
if expire == "0" {
ctx.Log.Info("设备注销")
g.logout(ctx.DeviceID, func(b *gb28181.Device) {
g.logout(ctx.DeviceID, func(b *ipc.Device) {
b.IsOnline = false
b.Address = ctx.Source.String()
})
@@ -173,7 +175,7 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
return
}
g.login(ctx, func(b *gb28181.Device) {
g.login(ctx, func(b *ipc.Device) {
b.IsOnline = true
b.RegisteredAt = orm.Now()
b.KeepaliveAt = orm.Now()
@@ -192,11 +194,11 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) {
respFn()
g.QueryDeviceInfo(ctx)
_ = g.QueryCatalog(dev.DeviceID)
_ = g.QueryConfigDownloadBasic(dev.DeviceID)
_ = g.QueryCatalog(dev.GetGB28181DeviceID())
_ = g.QueryConfigDownloadBasic(dev.GetGB28181DeviceID())
}
func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) {
func (g GB28181API) login(ctx *sip.Context, fn func(d *ipc.Device)) {
slog.Info("status change 设备上线", "device_id", ctx.DeviceID)
g.svr.memoryStorer.Change(ctx.DeviceID, fn, func(d *Device) {
d.conn = ctx.Request.GetConnection()
@@ -205,7 +207,7 @@ func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) {
})
}
func (g GB28181API) logout(deviceID string, changeFn func(*gb28181.Device)) error {
func (g GB28181API) logout(deviceID string, changeFn func(*ipc.Device)) error {
slog.Info("status change 设备离线", "device_id", deviceID)
return g.svr.memoryStorer.Change(deviceID, changeFn, func(d *Device) {
d.Expires = 0
+15 -12
View File
@@ -11,7 +11,8 @@ import (
"time"
"github.com/gowvp/gb28181/internal/conf"
gb28181 "github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/bz"
"github.com/gowvp/gb28181/internal/core/ipc"
"github.com/gowvp/gb28181/internal/core/sms"
"github.com/gowvp/gb28181/pkg/gbs/m"
"github.com/gowvp/gb28181/pkg/gbs/sip"
@@ -24,13 +25,13 @@ type MemoryStorer interface {
LoadDeviceToMemory(conn sip.Connection) // 加载设备到内存
RangeDevices(fn func(key string, value *Device) bool) // 遍历设备
Change(deviceID string, changeFn func(*gb28181.Device), changeFn2 func(*Device)) error // 登出设备
Change(deviceID string, changeFn func(*ipc.Device), changeFn2 func(*Device)) error // 登出设备
Load(deviceID string) (*Device, bool)
Store(deviceID string, value *Device)
GetChannel(deviceID, channelID string) (*Channel, bool)
// Change(deviceID string, changeFn func(*gb28181.Device)) // 修改设备
// Change(deviceID string, changeFn func(*ipc.Device)) // 修改设备
}
type Server struct {
@@ -42,7 +43,7 @@ type Server struct {
memoryStorer MemoryStorer
}
func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Server, func()) {
func NewServer(cfg *conf.Bootstrap, store ipc.Adapter, sc sms.Core) (*Server, func()) {
api := NewGB28181API(cfg, store, sc.NodeManager)
iip := ip.InternalIP()
@@ -90,18 +91,20 @@ func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Ser
func (s *Server) startTickerCheck() {
conc.Timer(context.Background(), 60*time.Second, time.Second, func() {
now := time.Now()
s.memoryStorer.RangeDevices(func(key string, ipc *Device) bool {
if !ipc.IsOnline {
s.memoryStorer.RangeDevices(func(key string, dev *Device) bool {
if !dev.IsOnline {
return true
}
timeout := time.Duration(ipc.keepaliveTimeout) * time.Duration(ipc.keepaliveInterval) * time.Second
if !bz.IsGB28181(key) {
return true
}
timeout := time.Duration(dev.keepaliveTimeout) * time.Duration(dev.keepaliveInterval) * time.Second
if timeout <= 0 {
timeout = 3 * 60 * time.Second
}
if sub := now.Sub(ipc.LastKeepaliveAt); sub >= timeout || ipc.conn == nil {
s.gb.logout(key, func(d *gb28181.Device) {
if sub := now.Sub(dev.LastKeepaliveAt); sub >= timeout || dev.conn == nil {
s.gb.logout(key, func(d *ipc.Device) {
d.IsOnline = false
})
}
@@ -212,8 +215,8 @@ func (s *Server) Play(in *PlayInput) error {
return s.gb.Play(in)
}
func (s *Server) StopPlay(in *StopPlayInput) error {
return s.gb.StopPlay(in)
func (s *Server) StopPlay(ctx context.Context, in *StopPlayInput) error {
return s.gb.StopPlay(ctx, in)
}
// QuerySnapshot 厂商实现抓图的少,sip 层已实现,先搁置