diff --git a/Dockerfile b/Dockerfile index ad68a98..0e2277e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,6 @@ RUN apk --no-cache add ca-certificates \ WORKDIR /app ADD ./build/linux_${TARGETARCH}/bin ./ -ADD ./configs/config.toml /app/configs/config.toml ADD ./www /app/www LABEL Name=gowvp Version=0.0.1 diff --git a/Dockerfile_full b/Dockerfile_full index 0571893..11ac6a0 100644 --- a/Dockerfile_full +++ b/Dockerfile_full @@ -3,6 +3,9 @@ FROM debian:bullseye-slim ENV TZ=Asia/Shanghai WORKDIR /opt/media/bin/ +RUN sed -i 's|http://deb.debian.org|http://mirrors.tuna.tsinghua.edu.cn|g; \ + s|http://security.debian.org|http://mirrors.tuna.tsinghua.edu.cn|g' /etc/apt/sources.list + RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ tzdata \ @@ -12,14 +15,21 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && rm -rf /var/lib/apt/lists/* COPY --from=zlmediakit/zlmediakit:master /opt/media/bin /opt/media/bin +COPY --from=mwader/static-ffmpeg:6.1 /ffmpeg /usr/local/bin/ffmpeg ADD ./build/linux_amd64/bin ./gowvp ADD ./www ./www RUN mkdir -p configs +RUN ln -sf /usr/local/bin/ffmpeg /usr/bin/ffmpeg && \ + chmod +x /usr/local/bin/ffmpeg + +RUN ln -sf /usr/local/bin/ffmpeg /opt/media/bin/ffmpeg + + LABEL Name=gowvp \ - Version=0.0.1 \ + Version=0.1.0 \ Maintainer="xx@golang.space" \ Description="gowvp & zlmediakit" diff --git a/go.mod b/go.mod index c6ee480..9cb671a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,9 @@ require ( github.com/gin-gonic/gin v1.11.0 github.com/glebarez/sqlite v1.11.0 github.com/google/wire v0.7.0 - github.com/ixugo/goddd v1.5.0 + github.com/gowvp/onvif v0.0.11 + github.com/ixugo/goddd v1.5.1 + github.com/ixugo/netpulse v0.1.3 github.com/jinzhu/copier v0.4.0 github.com/pelletier/go-toml/v2 v2.2.4 github.com/shirou/gopsutil/v4 v4.25.7 @@ -20,10 +22,13 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/beevik/etree v1.1.0 // indirect github.com/bytedance/gopkg v0.1.3 // indirect github.com/ebitengine/purego v0.8.4 // indirect + github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371 // indirect github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/goccy/go-yaml v1.18.0 // indirect + github.com/juju/errors v1.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/quic-go/qpack v0.5.1 // indirect @@ -47,7 +52,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gofrs/uuid v4.4.0+incompatible github.com/golang-jwt/jwt/v4 v4.5.2 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/google/uuid v1.6.0 github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgx/v5 v5.7.2 // indirect @@ -75,12 +80,12 @@ require ( go.uber.org/zap v1.27.0 // indirect go.uber.org/zap/exp v0.3.0 // indirect golang.org/x/arch v0.22.0 // indirect - golang.org/x/crypto v0.43.0 // indirect + golang.org/x/crypto v0.44.0 // indirect golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b // indirect - golang.org/x/net v0.46.0 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.37.0 // indirect - golang.org/x/text v0.30.0 + golang.org/x/net v0.47.0 // indirect + golang.org/x/sync v0.18.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/text v0.31.0 golang.org/x/time v0.12.0 // indirect google.golang.org/protobuf v1.36.10 // indirect modernc.org/libc v1.61.5 // indirect diff --git a/go.sum b/go.sum index 4c1e659..18df505 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= +github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w= @@ -17,6 +19,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371 h1:4WADfZZW26C7UgER4MEwZpS/THGj0VEf6HiXS3PyRfo= +github.com/elgs/gostrgen v0.0.0-20251010065124-dce324c66371/go.mod h1:qxVxKgX2MC/LcAmvASQ96hjjlcBOV6wQ+ZV9r1+nB3k= github.com/gabriel-vasile/mimetype v1.4.10 h1:zyueNbySn/z8mJZHLt6IPw0KoZsiQNszIpU+bX4+ZK0= github.com/gabriel-vasile/mimetype v1.4.10/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s= github.com/gin-contrib/cors v1.7.5 h1:cXC9SmofOrRg0w9PigwGlHG3ztswH6bqq4vJVXnvYMk= @@ -61,8 +65,12 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.7.0 h1:JxUKI6+CVBgCO2WToKy/nQk0sS+amI9z9EjVmdaocj4= github.com/google/wire v0.7.0/go.mod h1:n6YbUQD9cPKTnHXEBN2DXlOp/mVADhVErcMFb0v3J18= -github.com/ixugo/goddd v1.5.0 h1:XpdhZ8zIInN2NYMMrnNmVb3BZpa1ZXqiFAu5gQDAdGA= -github.com/ixugo/goddd v1.5.0/go.mod h1:FzEjEd6uWEWan1XWTh8VXdqGtyjMYGow/URNtBY8X7w= +github.com/gowvp/onvif v0.0.11 h1:Y2ULsB9j2aPlg2/q3WpVxYU7ftWA1dOjbqtY/8ANU/c= +github.com/gowvp/onvif v0.0.11/go.mod h1:Dshr55Q/Xgwa9XMQBPBQBMOWj/2Sq+DxLhdNY35uoFc= +github.com/ixugo/goddd v1.5.1 h1:1GSFcBMTNz84PSRaUuOin4/6GSda9jc9pAzy78IwRlw= +github.com/ixugo/goddd v1.5.1/go.mod h1:FzEjEd6uWEWan1XWTh8VXdqGtyjMYGow/URNtBY8X7w= +github.com/ixugo/netpulse v0.1.3 h1:760mxad/boWr5hxY2nD0I0yfmQcoNDrlu8KKyk7jOs0= +github.com/ixugo/netpulse v0.1.3/go.mod h1:vH0zFyVMxDkz8jVHtI9/2oEb7npi/5+eSIx5RzHkN4g= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -81,9 +89,15 @@ github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/errors v1.0.0 h1:yiq7kjCLll1BiaRuNY53MGI0+EQ3rF6GB+wvboZDefM= +github.com/juju/errors v1.0.0/go.mod h1:B5x9thDqx0wIMH3+aLIMP9HjItInYWObRovoCFM5Qe8= github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= @@ -119,6 +133,8 @@ github.com/quic-go/quic-go v0.55.0 h1:zccPQIqYCXDt5NmcEabyYvOnomjs8Tlwl7tISjJh9M github.com/quic-go/quic-go v0.55.0/go.mod h1:DR51ilwU1uE164KuWXhinFcKWGlEjzys2l8zUl5Ss1U= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= +github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/shirou/gopsutil/v4 v4.25.7 h1:bNb2JuqKuAu3tRlPv5piSmBZyMfecwQ+t/ILq+1JqVM= github.com/shirou/gopsutil/v4 v4.25.7/go.mod h1:XV/egmwJtd3ZQjBpJVY5kndsiOO4IRqy9TQnmm6VP7U= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -153,23 +169,23 @@ go.uber.org/zap/exp v0.3.0 h1:6JYzdifzYkGmTdRR59oYH+Ng7k49H9qVpWwNSsGJj3U= go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ= golang.org/x/arch v0.22.0 h1:c/Zle32i5ttqRXjdLyyHZESLD/bB90DCU1g9l/0YBDI= golang.org/x/arch v0.22.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A= -golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= -golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= +golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b h1:18qgiDvlvH7kk8Ioa8Ov+K6xCi0GMvmGfGW0sgd/SYA= golang.org/x/exp v0.0.0-20251009144603-d2f985daa21b/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= -golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= -golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= -golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= @@ -178,6 +194,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/adapter/gbadapter/gb.go b/internal/adapter/gbadapter/gb.go new file mode 100644 index 0000000..82f9035 --- /dev/null +++ b/internal/adapter/gbadapter/gb.go @@ -0,0 +1,84 @@ +package gbadapter + +import ( + "context" + + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/sms" + "github.com/gowvp/gb28181/pkg/gbs" +) + +var _ ipc.Protocoler = (*Adapter)(nil) + +type Adapter struct { + adapter ipc.Adapter + gbs *gbs.Server + smsCore sms.Core +} + +// DeleteDevice implements ipc.Protocoler. +func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error { + return nil +} + +func NewAdapter(adapter ipc.Adapter, gbs *gbs.Server, smsCore sms.Core) *Adapter { + return &Adapter{adapter: adapter, gbs: gbs, smsCore: smsCore} +} + +// InitDevice implements ipc.Protocoler. +func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} + +// OnStreamChanged implements ipc.Protocoler. +func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { + ch, err := a.adapter.GetChannel(ctx, stream) + if err != nil { + return err + } + return a.gbs.StopPlay(ctx, &gbs.StopPlayInput{Channel: ch}) +} + +// OnStreamNotFound implements ipc.Protocoler. +func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream string) error { + ch, err := a.adapter.GetChannel(ctx, stream) + if err != nil { + return err + } + + dev, err := a.adapter.GetDevice(ctx, ch.DID) + if err != nil { + return err + } + + svr, err := a.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) + if err != nil { + return err + } + + return a.gbs.Play(&gbs.PlayInput{ + Channel: ch, + StreamMode: dev.StreamMode, + SMS: svr, + }) +} + +// QueryCatalog implements ipc.Protocoler. +func (a *Adapter) QueryCatalog(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} + +// StartPlay implements ipc.Protocoler. +func (a *Adapter) StartPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) (*ipc.PlayResponse, error) { + panic("unimplemented") +} + +// StopPlay implements ipc.Protocoler. +func (a *Adapter) StopPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) error { + panic("unimplemented") +} + +// ValidateDevice implements ipc.Protocoler. +func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} diff --git a/internal/adapter/onvifadapter/hook.go b/internal/adapter/onvifadapter/hook.go new file mode 100644 index 0000000..15a67de --- /dev/null +++ b/internal/adapter/onvifadapter/hook.go @@ -0,0 +1,65 @@ +package onvifadapter + +import ( + "context" + "fmt" + "log/slog" + + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/sms" + "github.com/gowvp/gb28181/pkg/zlm" + "github.com/ixugo/goddd/pkg/orm" +) + +func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { + var ch ipc.Channel + if err := a.adapter.Store().Channel().Get(ctx, &ch, orm.Where("id=?", stream)); err != nil { + return err + } + if err := a.adapter.EditPlaying(ctx, ch.DeviceID, ch.ChannelID, false); err != nil { + slog.ErrorContext(ctx, "编辑播放状态失败", "err", err) + } + return nil +} + +func (a *Adapter) OnStreamNotFound(ctx context.Context, app, stream string) error { + var ch ipc.Channel + if err := a.adapter.Store().Channel().Get(ctx, &ch, orm.Where("id=?", stream)); err != nil { + return err + } + + onvifDev, ok := a.devices.Load(ch.DeviceID) + if !ok { + return fmt.Errorf("ONVIF 设备未初始化") + } + + streamURI, err := a.getStreamURI(ctx, onvifDev, ch.ChannelID) + if err != nil { + return err + } + svr, err := a.sms.GetMediaServer(ctx, sms.DefaultMediaServerID) + if err != nil { + return err + } + + _, err = a.sms.AddStreamProxy(svr, zlm.AddStreamProxyRequest{ + Vhost: "__defaultVhost__", + App: app, + Stream: stream, + URL: streamURI, + RetryCount: 3, + TimeoutSec: 10, + EnableHLSFMP4: zlm.NewBool(true), + EnableAudio: zlm.NewBool(true), + EnableRTSP: zlm.NewBool(true), + EnableRTMP: zlm.NewBool(true), + AddMuteAudio: zlm.NewBool(true), + AutoClose: zlm.NewBool(true), + }) + if err == nil { + if err := a.adapter.EditPlaying(ctx, ch.DeviceID, ch.ChannelID, true); err != nil { + slog.ErrorContext(ctx, "编辑播放状态失败", "err", err) + } + } + return err +} diff --git a/internal/adapter/onvifadapter/keepalive.go b/internal/adapter/onvifadapter/keepalive.go new file mode 100644 index 0000000..b098ba6 --- /dev/null +++ b/internal/adapter/onvifadapter/keepalive.go @@ -0,0 +1,99 @@ +package onvifadapter + +import ( + "context" + "log/slog" + "time" + + "github.com/gowvp/gb28181/internal/core/ipc" + devicemodel "github.com/gowvp/onvif/device" + sdkdevice "github.com/gowvp/onvif/sdk/device" + "github.com/ixugo/goddd/pkg/conc" + "github.com/ixugo/goddd/pkg/orm" +) + +// startHealthCheck 启动 ONVIF 设备健康检查(异步心跳 + 状态机方案) +// +// 设计思路: +// 1. 协程 1: 定期发送心跳(30秒),成功则更新内存中的最后心跳时间 +// 2. 协程 2: 定期检查状态(1秒),超时未心跳则标记离线 +// 3. 状态变化时才同步到数据库,减少数据库写入 +func (a *Adapter) startHealthCheck(ctx context.Context) { + const ( + heartbeatInterval = 30 * time.Second // 心跳间隔:30秒 + checkInterval = 1 * time.Second // 状态检查间隔:1秒 + heartbeatTimeout = 70 * time.Second // 心跳超时:60秒 + ) + + // 协程 1: 定期发送心跳 + go a.startHeartbeat(ctx, heartbeatInterval) + + // 协程 2: 定期检查状态 + go a.startStatusChecker(ctx, checkInterval, heartbeatTimeout) +} + +// startHeartbeat 协程 1: 定期发送心跳 +func (a *Adapter) startHeartbeat(ctx context.Context, interval time.Duration) { + conc.Timer(ctx, interval, interval, func() { + a.devices.Range(func(deviceID string, dev *Device) bool { + // TODO: 设计上预期接入数量较少,可以开几个协程 + // 如果接入设备数量较多,需要优化 + go a.sendHeartbeat(dev) + return true + }) + }) +} + +func (a *Adapter) sendHeartbeat(dev *Device) { + _, err := sdkdevice.Call_GetDeviceInformation(context.TODO(), dev.Device, devicemodel.GetDeviceInformation{}) + if err == nil { + dev.KeepaliveAt = orm.Now() + } +} + +// startStatusChecker 协程 2: 定期检查状态 +func (a *Adapter) startStatusChecker(ctx context.Context, interval, heartbeatTimeout time.Duration) { + conc.Timer(ctx, interval, interval, func() { + now := time.Now() + + a.devices.Range(func(did string, dev *Device) bool { + timeSinceLastKeepalive := now.Sub(dev.KeepaliveAt.Time) + isOnline := timeSinceLastKeepalive < heartbeatTimeout + + if dev.IsOnline == isOnline { + return true + } + + dev.IsOnline = isOnline + + // 记录状态变化日志 + if isOnline { + slog.InfoContext(ctx, "ONVIF 设备上线", + "device_id", did, + "offline_duration", timeSinceLastKeepalive) + } else { + slog.WarnContext(ctx, "ONVIF 设备离线", + "device_id", did, + "last_keepalive", dev.KeepaliveAt.Time, + "timeout", timeSinceLastKeepalive) + } + + a.syncDeviceStatusToDB(ctx, did, isOnline) + return true + }) + }) +} + +// syncDeviceStatusToDB 同步设备状态到数据库(状态变化时调用) +func (a *Adapter) syncDeviceStatusToDB(ctx context.Context, did string, isOnline bool) { + // 更新设备状态 + if err := a.adapter.Edit(did, func(d *ipc.Device) { + d.IsOnline = isOnline + if isOnline { + d.KeepaliveAt = orm.Now() + } + }); err != nil { + slog.ErrorContext(ctx, "更新设备在线状态失败", "err", err, "device_id", did) + return + } +} diff --git a/internal/adapter/onvifadapter/model.go b/internal/adapter/onvifadapter/model.go new file mode 100644 index 0000000..a5c31cf --- /dev/null +++ b/internal/adapter/onvifadapter/model.go @@ -0,0 +1,21 @@ +package onvifadapter + +import ( + "strings" + + "github.com/gowvp/onvif" +) + +type DiscoverResponse struct { + Addr string `json:"addr"` +} + +func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse { + addr := dev.GetDeviceParams().Xaddr + if !strings.Contains(addr, ":") { + addr += ":80" + } + return &DiscoverResponse{ + Addr: addr, + } +} diff --git a/internal/adapter/onvifadapter/onvif.go b/internal/adapter/onvifadapter/onvif.go new file mode 100644 index 0000000..ffffebc --- /dev/null +++ b/internal/adapter/onvifadapter/onvif.go @@ -0,0 +1,295 @@ +package onvifadapter + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "time" + + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/sms" + "github.com/gowvp/onvif" + devicemodel "github.com/gowvp/onvif/device" + m "github.com/gowvp/onvif/media" + sdkdevice "github.com/gowvp/onvif/sdk/device" + sdkmedia "github.com/gowvp/onvif/sdk/media" + xsdonvif "github.com/gowvp/onvif/xsd/onvif" + "github.com/ixugo/goddd/pkg/conc" + "github.com/ixugo/goddd/pkg/orm" +) + +var _ ipc.Protocoler = (*Adapter)(nil) + +// Adapter ONVIF 协议适配器 +// +// 设计说明: +// - 适配器实现 ipc.Protocol 接口(Port 在 ipc 包内) +// - 适配器直接依赖领域模型 (ipc.Device, ipc.Channel) +// - 适配器依赖 ipc.Adapter 来访问存储和通用功能 +// - 这符合清晰架构: 外层(适配器)依赖内层(领域) +type Adapter struct { + devices conc.Map[string, *Device] // ONVIF 设备连接缓存 + adapter ipc.Adapter // 通用适配器,提供 SaveChannels 等方法 + client *http.Client + sms sms.Core +} + +// Device ONVIF 设备包装(内存状态 + ONVIF 连接) +type Device struct { + *onvif.Device + KeepaliveAt orm.Time // 最后心跳时间 + IsOnline bool // 在线状态(内存缓存) +} + +// DeleteDevice implements ipc.Protocoler. +func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error { + a.devices.Delete(device.ID) + return nil +} + +func NewAdapter(adapter ipc.Adapter, sms sms.Core) *Adapter { + cli := *http.DefaultClient + cli.Timeout = time.Millisecond * 3000 + a := Adapter{ + adapter: adapter, + client: &cli, + sms: sms, + } + a.init() + + // 启动健康检查 + go a.startHealthCheck(context.Background()) + + return &a +} + +func (a *Adapter) init() { + devices, err := a.adapter.FindDevices(context.TODO()) + if err != nil { + panic(err) + } + for _, device := range devices { + if device.IsOnvif() { + go func(device *ipc.Device) { + onvifDev, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", device.IP, device.Port), + Username: device.GetUsername(), + Password: device.Password, + HttpClient: a.client, + }) + if err != nil { + _ = a.adapter.Edit(device.ID, func(d *ipc.Device) { + d.IsOnline = false + }) + slog.Error("初始化 ONVIF 设备失败", "err", err, "device_id", device.ID) + } + a.devices.Store(device.ID, &Device{ + Device: onvifDev, + KeepaliveAt: orm.Now(), + IsOnline: err == nil, + }) + }(device) + } + } +} + +// ValidateDevice 实现 ipc.Protocol 接口 - ONVIF 设备验证 +// +// 直接使用 *ipc.Device 作为参数,不需要类型断言! +func (a *Adapter) ValidateDevice(ctx context.Context, dev *ipc.Device) error { + // 不需要类型断言,直接使用领域模型 + + // 直接访问领域模型的字段 + onvifDev, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port), + Username: dev.GetUsername(), + Password: dev.Password, + HttpClient: a.client, + }) + if err != nil { + return fmt.Errorf("IP 或 PORT 错误: %w", err) + } + + // 获取设备信息并填充到领域模型 + resp, err := sdkdevice.Call_GetDeviceInformation(ctx, onvifDev, devicemodel.GetDeviceInformation{}) + if err != nil { + return fmt.Errorf("账号或密码错误: %w", err) + } + dev.Transport = "tcp" + dev.Ext.Firmware = resp.FirmwareVersion + dev.Ext.Manufacturer = resp.Manufacturer + dev.Ext.Model = resp.Model + dev.IsOnline = true + dev.Address = fmt.Sprintf("%s:%d", dev.IP, dev.Port) + return nil +} + +// InitDevice 实现 ipc.Protocol 接口 - 初始化 ONVIF 设备 +// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道 +func (a *Adapter) InitDevice(ctx context.Context, dev *ipc.Device) error { + // 创建 ONVIF 连接 + onvifDev, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port), + Username: dev.GetUsername(), + Password: dev.Password, + HttpClient: a.client, + }) + if err != nil { + return err + } + + // 缓存设备连接 + d := Device{ + Device: onvifDev, + KeepaliveAt: orm.Now(), + IsOnline: true, + } + a.devices.Store(dev.ID, &d) + + // 自动查询 Profiles 作为通道 + return a.queryAndSaveProfiles(ctx, dev, &d) +} + +// QueryCatalog 实现 ipc.Protocol 接口 - ONVIF 查询 Profiles +func (a *Adapter) QueryCatalog(ctx context.Context, dev *ipc.Device) error { + onvifDev, ok := a.devices.Load(dev.ID) + if !ok { + // 设备连接不在缓存中,尝试重新连接 + var err error + d, err := onvif.NewDevice(onvif.DeviceParams{ + Xaddr: fmt.Sprintf("%s:%d", dev.IP, dev.Port), + Username: dev.GetUsername(), + Password: dev.Password, + }) + if err != nil { + return fmt.Errorf("ONVIF 设备未初始化: %w", err) + } + onvifDev = &Device{ + Device: d, + KeepaliveAt: orm.Now(), + IsOnline: true, + } + a.devices.Store(dev.ID, onvifDev) + } + + return a.queryAndSaveProfiles(ctx, dev, onvifDev) +} + +// StartPlay 实现 ipc.Protocol 接口 - ONVIF 播放 +func (a *Adapter) StartPlay(ctx context.Context, dev *ipc.Device, ch *ipc.Channel) (*ipc.PlayResponse, error) { + onvifDev, ok := a.devices.Load(dev.ID) + if !ok { + return nil, fmt.Errorf("ONVIF 设备未初始化") + } + + // 获取 RTSP 地址 + streamURI, err := a.getStreamURI(ctx, onvifDev, ch.ChannelID) + if err != nil { + return nil, err + } + + return &ipc.PlayResponse{ + RTSP: streamURI, + }, nil +} + +// StopPlay 实现 ipc.Protocol 接口 - ONVIF 停止播放 +func (a *Adapter) StopPlay(ctx context.Context, dev *ipc.Device, ch *ipc.Channel) error { + // ONVIF 通常不需要显式停止播放 + return nil +} + +// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道 +// +// 使用统一的 SaveChannels 方法,自动处理增量更新和删除 +func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device *ipc.Device, onvifDev *Device) error { + resp, err := sdkmedia.Call_GetProfiles(ctx, onvifDev.Device, m.GetProfiles{}) + if err != nil { + return fmt.Errorf("账号或密码错误: %w", err) + } + + // 将 Profiles 转换为通道列表 + channels := make([]*ipc.Channel, 0, len(resp.Profiles)) + for _, profile := range resp.Profiles { + channel := &ipc.Channel{ + DeviceID: device.ID, + ChannelID: string(profile.Token), + Name: string(profile.Name), + DID: device.ID, + IsOnline: true, + Type: ipc.TypeOnvif, + } + channels = append(channels, channel) + } + + // 使用统一的 SaveChannels 方法保存(自动处理增删改) + if err := a.adapter.SaveChannels(channels); err != nil { + return fmt.Errorf("保存 ONVIF 通道失败: %w", err) + } + + slog.InfoContext(ctx, "ONVIF Profiles 同步完成", + "device_id", device.ID, + "profile_count", len(channels)) + + return nil +} + +// getStreamURI 获取 RTSP 流地址 +func (a *Adapter) getStreamURI(ctx context.Context, dev *Device, profileToken string) (string, error) { + var param m.GetStreamUri + param.StreamSetup.Transport.Protocol = "RTSP" + param.StreamSetup.Stream = "RTP-Unicast" + param.ProfileToken = xsdonvif.ReferenceToken(profileToken) + resp, err := sdkmedia.Call_GetStreamUri(ctx, dev.Device, param) + if err != nil { + return "", err + } + playURL := buildPlayURL(string(resp.MediaUri.Uri), dev.Device.GetDeviceParams().Username, dev.Device.GetDeviceParams().Password) + return playURL, nil +} + +func buildPlayURL(rawurl, username, password string) string { + if username != "" && password != "" { + return strings.Replace(rawurl, "rtsp://", fmt.Sprintf("rtsp://%s:%s@", username, password), 1) + } + return rawurl +} + +func (a *Adapter) Discover(ctx context.Context, w io.Writer) error { + recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces() + if err != nil { + return err + } + defer cancel() + + for { + select { + case dev, ok := <-recv: + if !ok { + return nil + } + var exists bool + a.devices.Range(func(key string, value *Device) bool { + if value.GetDeviceParams().Xaddr == dev.GetDeviceParams().Xaddr { + exists = true + return false + } + return true + }) + if exists { + continue + } + b, _ := json.Marshal(toDiscoverResponse(dev)) + _, _ = w.Write(b) + case <-ctx.Done(): + return nil + case <-time.After(3 * time.Second): + slog.DebugContext(ctx, "discover timeout") + return nil + } + } +} diff --git a/internal/adapter/rtspadapter/rtsp.go b/internal/adapter/rtspadapter/rtsp.go new file mode 100644 index 0000000..62ed0b0 --- /dev/null +++ b/internal/adapter/rtspadapter/rtsp.go @@ -0,0 +1,94 @@ +package rtspadapter + +import ( + "context" + + "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/proxy" + "github.com/gowvp/gb28181/internal/core/sms" + "github.com/gowvp/gb28181/pkg/zlm" +) + +var _ ipc.Protocoler = (*Adapter)(nil) + +type Adapter struct { + proxyCore *proxy.Core + smsCore sms.Core +} + +// DeleteDevice implements ipc.Protocoler. +func (a *Adapter) DeleteDevice(ctx context.Context, device *ipc.Device) error { + return nil +} + +func NewAdapter(proxyCore *proxy.Core, smsCore sms.Core) *Adapter { + return &Adapter{ + proxyCore: proxyCore, + smsCore: smsCore, + } +} + +// InitDevice implements ipc.Protocoler. +func (a *Adapter) InitDevice(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} + +// OnStreamChanged implements ipc.Protocoler. +func (a *Adapter) OnStreamChanged(ctx context.Context, stream string) error { + return nil +} + +// OnStreamNotFound implements ipc.Protocoler. +func (a *Adapter) OnStreamNotFound(ctx context.Context, app string, stream string) error { + proxy, err := a.proxyCore.GetStreamProxy(ctx, stream) + if err != nil { + return err + } + + svr, err := a.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) + if err != nil { + return err + } + resp, err := a.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{ + Vhost: "__defaultVhost__", + App: proxy.App, + Stream: proxy.Stream, + URL: proxy.SourceURL, + RetryCount: 3, + RTPType: proxy.Transport, + TimeoutSec: 10, + EnableHLSFMP4: zlm.NewBool(true), + EnableAudio: zlm.NewBool(true), + EnableRTSP: zlm.NewBool(true), + EnableRTMP: zlm.NewBool(true), + AddMuteAudio: zlm.NewBool(true), + AutoClose: zlm.NewBool(true), + }) + if err != nil { + return err + } + // 用于关闭 + a.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID) + + return nil +} + +// QueryCatalog implements ipc.Protocoler. +func (a *Adapter) QueryCatalog(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} + +// StartPlay implements ipc.Protocoler. +func (a *Adapter) StartPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) (*ipc.PlayResponse, error) { + panic("unimplemented") +} + +// StopPlay implements ipc.Protocoler. +func (a *Adapter) StopPlay(ctx context.Context, device *ipc.Device, channel *ipc.Channel) error { + panic("unimplemented") +} + +// ValidateDevice implements ipc.Protocoler. +func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error { + panic("unimplemented") +} diff --git a/internal/app/app.go b/internal/app/app.go index b54c6f4..c46bef0 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -35,8 +35,8 @@ func Run(bc *conf.Bootstrap) { go setupZLM(ctx, bc.ConfigDir) // 如果需要执行表迁移,递增此版本号和表更新说明 - versionapi.DBVersion = "0.0.14" - versionapi.DBRemark = "add stream proxy" + versionapi.DBVersion = "0.0.17" + versionapi.DBRemark = "onvif device support" handler, cleanUp, err := wireApp(bc, log) if err != nil { @@ -100,12 +100,6 @@ func setupZLM(ctx context.Context, dir string) { workDir := system.Getwd() configPath := filepath.Join(dir, "zlm.ini") - // 预创建命令实例,在循环中重用 - cmd := exec.CommandContext(ctx, "./MediaServer", "-s", "default.pem", "-c", configPath) - cmd.Dir = workDir - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - for { select { case <-ctx.Done(): @@ -113,6 +107,12 @@ func setupZLM(ctx context.Context, dir string) { return default: slog.Info("MediaServer 启动中...") + cmd := exec.CommandContext(ctx, "./MediaServer", "-s", "default.pem", "-c", configPath) + cmd.Dir = workDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + // 启动命令 - 正常情况下会阻塞在这里 if err := cmd.Run(); err != nil { slog.Error("zlm 运行失败", "err", err) @@ -121,7 +121,7 @@ func setupZLM(ctx context.Context, dir string) { } // 等待后重启(不管是正常退出还是异常退出) - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) } } } diff --git a/internal/app/wire_gen.go b/internal/app/wire_gen.go index 98bea50..cd88399 100644 --- a/internal/app/wire_gen.go +++ b/internal/app/wire_gen.go @@ -30,14 +30,15 @@ func wireApp(bc *conf.Bootstrap, log *slog.Logger) (http.Handler, func(), error) uniqueidCore := api.NewUniqueID(db) pushCore := api.NewPushCore(db, uniqueidCore) storer := api.NewIPCStore(db) - gbdAdapter := api.NewGBAdapter(storer, uniqueidCore) - server, cleanup := gbs.NewServer(bc, gbdAdapter, smsCore) - v := api.NewProtocols(storer) + adapter := api.NewGBAdapter(storer, uniqueidCore) + server, cleanup := gbs.NewServer(bc, adapter, smsCore) + proxyCore := api.NewProxyCore(db, uniqueidCore) + v := api.NewProtocols(adapter, smsCore, proxyCore, server) ipcCore := api.NewIPCCore(storer, uniqueidCore, v) - webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore) + webHookAPI := api.NewWebHookAPI(smsCore, pushCore, bc, server, ipcCore, v) pushAPI := api.NewPushAPI(pushCore, smsCore, bc) ipcapi := api.NewIPCAPI(ipcCore) - proxyAPI := api.NewProxyAPI(db, uniqueidCore) + proxyAPI := api.NewProxyAPI(proxyCore) configAPI := api.NewConfigAPI(db, bc) userAPI := api.NewUserAPI(bc) usecase := &api.Usecase{ diff --git a/internal/core/bz/bz.go b/internal/core/bz/bz.go new file mode 100644 index 0000000..c81d39a --- /dev/null +++ b/internal/core/bz/bz.go @@ -0,0 +1,28 @@ +package bz + +import "strings" + +const ( + IDPrefixGB = "gb" // 国标设备 + IDPrefixGBChannel = "ch" // 国标通道 id 前缀,channel + IDPrefixOnvif = "on" // onvif 设备 id 前缀 + IDPrefixOnvifChannel = "pr" // onvif 通道 id 前缀,profile + IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰 + IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰 +) + +func IsGB28181(stream string) bool { + return strings.HasPrefix(stream, IDPrefixGB) || strings.HasPrefix(stream, IDPrefixGBChannel) +} + +func IsOnvif(stream string) bool { + return strings.HasPrefix(stream, IDPrefixOnvif) || strings.HasPrefix(stream, IDPrefixOnvifChannel) +} + +func IsRTMP(stream string) bool { + return strings.HasPrefix(stream, IDPrefixRTMP) +} + +func IsRTSP(stream string) bool { + return strings.HasPrefix(stream, IDPrefixRTSP) +} diff --git a/internal/core/bz/param.go b/internal/core/bz/param.go deleted file mode 100644 index 040c021..0000000 --- a/internal/core/bz/param.go +++ /dev/null @@ -1,9 +0,0 @@ -package bz - -const ( - IDPrefixGB = "gb" // 国标设备 - IDPrefixOnvif = "on" // 国标设备 - IDPrefixGBChannel = "ch" // 国标通道 id 前缀 - IDPrefixRTMP = "mp" // rtmp ID 前缀,取 rtmp 后缀的 mp,不好记但是清晰 - IDPrefixRTSP = "sp" // rtsp ID 前缀,取 rtsp 后缀的 sp,不好记但是清晰 -) diff --git a/internal/core/ipc/channel.go b/internal/core/ipc/channel.go index 5b17039..0e4adc7 100755 --- a/internal/core/ipc/channel.go +++ b/internal/core/ipc/channel.go @@ -11,6 +11,7 @@ import ( "github.com/ixugo/goddd/pkg/orm" "github.com/ixugo/goddd/pkg/reason" "github.com/jinzhu/copier" + "gorm.io/gorm" ) // ChannelStorer Instantiation interface @@ -22,6 +23,7 @@ type ChannelStorer interface { Del(context.Context, *Channel, ...orm.QueryOption) error BatchEdit(context.Context, string, any, ...orm.QueryOption) error // 批量更新一个字段 + Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error } // FindChannel Paginated search @@ -83,6 +85,7 @@ func (c *Core) AddChannel(ctx context.Context, in *AddChannelInput) (*Channel, e // EditChannel Update object information func (c *Core) EditChannel(ctx context.Context, in *EditChannelInput, id string) (*Channel, error) { + // TODO: 修改 onvif 的账号/密码 后需要重新连接设备 var out Channel if err := c.store.Channel().Edit(ctx, &out, func(b *Channel) { if err := copier.Copy(b, in); err != nil { diff --git a/internal/core/ipc/channel.model.go b/internal/core/ipc/channel.model.go index 1d071f5..b04c240 100755 --- a/internal/core/ipc/channel.model.go +++ b/internal/core/ipc/channel.model.go @@ -1,7 +1,12 @@ // Code generated by godddx, DO AVOID EDIT. package ipc -import "github.com/ixugo/goddd/pkg/orm" +import ( + "strings" + + "github.com/gowvp/gb28181/internal/core/bz" + "github.com/ixugo/goddd/pkg/orm" +) // Channel domain model type Channel struct { @@ -16,6 +21,7 @@ type Channel struct { Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb" json:"ext"` CreatedAt orm.Time `gorm:"column:created_at;notNull;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt orm.Time `gorm:"column:updated_at;notNull;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 + Type string `gorm:"column:type;notNull;default:'';comment:通道类型" json:"type"` // 通道类型,继承父级设备类型 } // TableName database table name @@ -32,6 +38,14 @@ func (c *Channel) GetChannelID() string { return c.ChannelID } -func (c *Channel) GetDeviceID() string { +func (c *Channel) GetGB28181DeviceID() string { return c.DeviceID } + +func (c *Channel) IsOnvif() bool { + return c.Type == TypeOnvif || bz.IsOnvif(c.ID) +} + +func (c *Channel) IsGB28181() bool { + return strings.HasPrefix(c.ID, bz.IDPrefixGBChannel) || c.Type == TypeGB28181 || c.Type == "" +} diff --git a/internal/core/ipc/core.go b/internal/core/ipc/core.go index e0f75ff..d17af25 100755 --- a/internal/core/ipc/core.go +++ b/internal/core/ipc/core.go @@ -2,7 +2,6 @@ package ipc import ( - "github.com/gowvp/gb28181/internal/core/port" "github.com/ixugo/goddd/domain/uniqueid" ) @@ -16,14 +15,18 @@ type Storer interface { type Core struct { store Storer uniqueID uniqueid.Core - protocols map[string]port.Protocol // 协议映射 + protocols map[string]Protocoler // 协议映射(Protocol 在同一个包内) } // NewCore create business domain -func NewCore(store Storer, uni uniqueid.Core, protocols map[string]port.Protocol) Core { +func NewCore(store Storer, uni uniqueid.Core, protocols map[string]Protocoler) Core { return Core{ store: store, uniqueID: uni, protocols: protocols, } } + +func (c *Core) GetProtocol(atype string) Protocoler { + return c.protocols[atype] +} diff --git a/internal/core/ipc/device.go b/internal/core/ipc/device.go index 01f165d..fcba3be 100755 --- a/internal/core/ipc/device.go +++ b/internal/core/ipc/device.go @@ -5,7 +5,6 @@ import ( "context" "log/slog" - "github.com/gowvp/gb28181/internal/core/bz" "github.com/ixugo/goddd/pkg/orm" "github.com/ixugo/goddd/pkg/reason" "github.com/ixugo/goddd/pkg/web" @@ -104,17 +103,17 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error } // 协议验证(通过接口调用) - if protocol, ok := c.protocols[out.Type]; ok { + if protocol, ok := c.protocols[out.GetType()]; ok { if err := protocol.ValidateDevice(ctx, &out); err != nil { return nil, reason.ErrBadRequest.SetMsg(err.Error()) } } + out.ID = GenerateDID(&out, c.uniqueID) if out.IsOnvif() { - out.ID = c.uniqueID.UniqueID(bz.IDPrefixOnvif) out.DeviceID = out.ID } else { - out.ID = c.uniqueID.UniqueID(bz.IDPrefixGB) + out.Username = out.DeviceID } if err := out.Check(); err != nil { @@ -130,7 +129,7 @@ func (c Core) AddDevice(ctx context.Context, in *AddDeviceInput) (*Device, error } // 初始化协议连接(失败不影响设备添加) - if protocol, ok := c.protocols[out.Type]; ok { + if protocol, ok := c.protocols[out.GetType()]; ok { if err := protocol.InitDevice(ctx, &out); err != nil { slog.WarnContext(ctx, "初始化协议失败", "err", err, "device_id", out.ID) } @@ -158,5 +157,22 @@ func (c Core) DelDevice(ctx context.Context, id string) (*Device, error) { if err := c.store.Device().Del(ctx, &dev, orm.Where("id=?", id)); err != nil { return nil, reason.ErrDB.Withf(`Del err[%s]`, err.Error()) } + + if err := c.store.Channel().Session(ctx, func(tx *gorm.DB) error { + if err := orm.Delete(tx, &dev, orm.Where("id=?", id)); err != nil { + return err + } + if protocol, ok := c.protocols[dev.GetType()]; ok { + if err := protocol.DeleteDevice(ctx, &dev); err != nil { + return err + } + } + return nil + }, func(d *gorm.DB) error { + return d.Where("did=?", id).Delete(&Channel{}).Error + }); err != nil { + return nil, reason.ErrDB.Withf(`DelChannel err[%s]`, err.Error()) + } + return &dev, nil } diff --git a/internal/core/ipc/device.model.go b/internal/core/ipc/device.model.go index eed89fa..20bee33 100755 --- a/internal/core/ipc/device.model.go +++ b/internal/core/ipc/device.model.go @@ -5,14 +5,19 @@ import ( "fmt" "strings" + "github.com/gowvp/gb28181/internal/core/bz" "github.com/ixugo/goddd/pkg/orm" ) // Device domain model type Device struct { - ID string `gorm:"primaryKey" json:"id"` - Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181) - DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号 + ID string `gorm:"primaryKey" json:"id"` + Type string `gorm:"column:type;notNull;default:'';comment:设备类型(onvif)" json:"type"` // 设备类型(onvif/gb28181) + + // 自定义 id 可以简写为 did,国标 device_id 应简写成 gbid,而不是 device_id + // 起这个名义会难以区分,但为了兼容已经部署的旧数据库...保留 + DeviceID string `gorm:"column:device_id;notNull;uniqueIndex;default:'';comment:20 位国标编号" json:"device_id"` // 20 位国标编号 + Name string `gorm:"column:name;notNull;default:'';comment:设备名称" json:"name"` // 设备名称 Transport string `gorm:"column:transport;notNull;default:'';comment:传输协议(tcp/udp)" json:"transport"` // 传输协议(TCP/UDP) StreamMode int8 `gorm:"column:stream_mode;notNull;default:1;comment:数据传输模式(0:UDP; 1:TCP_PASSIVE; 2:TCP_ACTIVE)" json:"stream_mode"` // 数据传输模式 @@ -29,18 +34,17 @@ type Device struct { Password string `gorm:"column:password;notNull;default:'';comment:注册密码" json:"password"` Address string `gorm:"column:address;notNull;default:'';comment:设备网络地址" json:"address"` Ext DeviceExt `gorm:"column:ext;notNull;default:'{}';type:jsonb;comment:设备属性" json:"ext"` // 设备属性 - // onvif - Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"` + Username string `gorm:"column:username;notNull;default:'';comment:用户名" json:"username"` Children []*Channel `gorm:"-" json:"children,omitzero"` } func (d *Device) IsOnvif() bool { - return strings.ToUpper(d.Type) == "ONVIF" + return strings.HasPrefix(d.ID, bz.IDPrefixOnvif) || d.Type == TypeOnvif } func (d *Device) IsGB28181() bool { - return strings.ToUpper(d.Type) == "GB28181" || d.Type == "" + return strings.HasPrefix(d.ID, bz.IDPrefixGB) || d.Type == TypeGB28181 || d.Type == "" } // TableName database table name @@ -49,7 +53,7 @@ func (*Device) TableName() string { } func (d Device) Check() error { - if d.IsGB28181() && len(d.DeviceID) < 18 { + if d.IsGB28181() && len(d.Username) < 18 { return fmt.Errorf("国标 ID 长度应大于等于 18 位") } if d.IsOnvif() { @@ -69,9 +73,9 @@ func (d Device) Check() error { return nil } -func (d *Device) init(id, deviceID string) { +func (d *Device) init(id, gbid string) { d.ID = id - d.DeviceID = deviceID + d.DeviceID = gbid } func (d *Device) NetworkAddress() string { @@ -83,12 +87,18 @@ func (d *Device) GetID() string { return d.ID } -func (d *Device) GetDeviceID() string { +func (d *Device) GetGB28181DeviceID() string { + // if d.Username != "" { + // return d.Username + // } return d.DeviceID } func (d *Device) GetType() string { - return d.Type + if d.Type != "" { + return d.Type + } + return GetType(d.ID) } func (d *Device) GetIP() string { diff --git a/internal/core/ipc/device.param.go b/internal/core/ipc/device.param.go index bc78edd..364b8fa 100755 --- a/internal/core/ipc/device.param.go +++ b/internal/core/ipc/device.param.go @@ -52,7 +52,7 @@ type AddDeviceInput struct { Name string `json:"name"` // 设备名称 Password string `json:"password"` // 注册密码 - Type string `json:"type"` // 设备类型(onvif/gb28181) + Type string `json:"type"` // 设备类型(ONVIF/GB28181) // Addr string `json:"addr"` // 地址(ip:port) diff --git a/internal/core/ipc/gbs.go b/internal/core/ipc/gbs.go deleted file mode 100644 index c6df210..0000000 --- a/internal/core/ipc/gbs.go +++ /dev/null @@ -1,115 +0,0 @@ -package ipc - -import ( - "context" - - "github.com/gowvp/gb28181/internal/core/bz" - "github.com/ixugo/goddd/domain/uniqueid" - "github.com/ixugo/goddd/pkg/orm" - "github.com/ixugo/goddd/pkg/web" -) - -type GBDAdapter struct { - // deviceStore DeviceStorer - // channelStore ChannelStorer - store Storer - uni uniqueid.Core -} - -func NewGBAdapter(store Storer, uni uniqueid.Core) GBDAdapter { - return GBDAdapter{ - store: store, - uni: uni, - } -} - -func (g GBDAdapter) Store() Storer { - return g.store -} - -func (g GBDAdapter) GetDeviceByDeviceID(deviceID string) (*Device, error) { - ctx := context.TODO() - var d Device - if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", deviceID)); err != nil { - if !orm.IsErrRecordNotFound(err) { - return nil, err - } - d.init(g.uni.UniqueID(bz.IDPrefixGB), deviceID) - if err := g.store.Device().Add(ctx, &d); err != nil { - return nil, err - } - } - return &d, nil -} - -func (g GBDAdapter) Logout(deviceID string, changeFn func(*Device)) error { - var d Device - if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { - changeFn(d) - }, orm.Where("device_id=?", deviceID)); err != nil { - return err - } - - return nil -} - -func (g GBDAdapter) Edit(deviceID string, changeFn func(*Device)) error { - var d Device - if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { - changeFn(d) - }, orm.Where("device_id=?", deviceID)); err != nil { - return err - } - - return nil -} - -func (g GBDAdapter) EditPlaying(deviceID, channelID string, playing bool) error { - var ch Channel - if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) { - c.IsPlaying = playing - }, orm.Where("device_id = ? AND channel_id = ?", deviceID, channelID)); err != nil { - return err - } - return nil -} - -func (g GBDAdapter) SaveChannels(channels []*Channel) error { - if len(channels) <= 0 { - return nil - } - - var dev Device - _ = g.store.Device().Edit(context.TODO(), &dev, func(d *Device) { - d.Channels = len(channels) - }, orm.Where("device_id=?", channels[0].DeviceID)) - - // chIDs := make([]string, 0, 8) - for _, channel := range channels { - var ch Channel - if err := g.store.Channel().Edit(context.TODO(), &ch, func(c *Channel) { - c.IsOnline = channel.IsOnline - ch.DID = dev.ID - }, orm.Where("device_id = ? AND channel_id = ?", channel.DeviceID, channel.ChannelID)); err != nil { - channel.ID = g.uni.UniqueID(bz.IDPrefixGBChannel) - channel.DID = dev.ID - _ = g.store.Channel().Add(context.TODO(), channel) - } - // chIDs = append(chIDs, channel.ID) - } - - // TODO: 清理相关资源 - // if len(chIDs) > 0 { - // } - - return nil -} - -// FindDevices 获取所有设备 -func (g GBDAdapter) FindDevices(ctx context.Context) ([]*Device, error) { - var devices []*Device - if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil { - return nil, err - } - return devices, nil -} diff --git a/internal/core/ipc/model.go b/internal/core/ipc/model.go index f778a5b..b46b36a 100755 --- a/internal/core/ipc/model.go +++ b/internal/core/ipc/model.go @@ -5,9 +5,32 @@ import ( "database/sql/driver" "encoding/json" + "github.com/gowvp/gb28181/internal/core/bz" "github.com/ixugo/goddd/pkg/orm" ) +const ( + TypeGB28181 = "GB28181" + TypeOnvif = "ONVIF" + TypeRTSP = "RTSP" + TypeRTMP = "RTMP" +) + +func GetType(stream string) string { + switch true { + case bz.IsGB28181(stream): + return TypeGB28181 + case bz.IsOnvif(stream): + return TypeOnvif + case bz.IsRTSP(stream): + return TypeRTSP + case bz.IsRTMP(stream): + return TypeRTMP + default: + return "" + } +} + // DeviceExt domain model type DeviceExt struct { Manufacturer string `json:"manufacturer"` // 生产厂商 diff --git a/internal/core/ipc/port.go b/internal/core/ipc/port.go new file mode 100644 index 0000000..eaa0ec3 --- /dev/null +++ b/internal/core/ipc/port.go @@ -0,0 +1,52 @@ +package ipc + +import ( + "context" +) + +// Protocoler 协议抽象接口(端口) +// +// 设计原则: +// 1. 接口在 ipc 包内定义,避免循环依赖 +// 2. 接口方法直接使用领域模型 (*Device, *Channel) +// 3. 适配器实现此接口,可以直接依赖和修改领域模型 +// 4. 符合依赖倒置原则 (DIP): +// - ipc (高层) 依赖 Protocoler 接口 +// - adapter (低层) 实现 Protocoler 接口 +// - adapter (低层) 依赖 ipc.Device (高层) ✅ 合理 +// +// 这就是依赖反转! +type Protocoler interface { + // ValidateDevice 验证设备连接(添加设备前调用) + // 可以修改设备信息(如从 ONVIF 获取的固件版本等) + ValidateDevice(ctx context.Context, device *Device) error + + // InitDevice 初始化设备连接(添加设备后调用) + // 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道 + InitDevice(ctx context.Context, device *Device) error + + // QueryCatalog 查询设备目录/通道 + QueryCatalog(ctx context.Context, device *Device) error + + // StartPlay 开始播放 + StartPlay(ctx context.Context, device *Device, channel *Channel) (*PlayResponse, error) + + // StopPlay 停止播放 + StopPlay(ctx context.Context, device *Device, channel *Channel) error + + DeleteDevice(ctx context.Context, device *Device) error + + Hooker +} + +type Hooker interface { + OnStreamNotFound(ctx context.Context, app, stream string) error + OnStreamChanged(ctx context.Context, stream string) error +} + +// PlayResponse 播放响应 +type PlayResponse struct { + SSRC string // GB28181 SSRC + Stream string // 流 ID + RTSP string // RTSP 地址 (ONVIF) +} diff --git a/internal/core/ipc/port/protocol.go b/internal/core/ipc/port/protocol.go new file mode 100644 index 0000000..5a708a1 --- /dev/null +++ b/internal/core/ipc/port/protocol.go @@ -0,0 +1,64 @@ +package port + +import ( + "context" +) + +// Device 设备接口 +// 注意: 适配器实现时,参数类型为 *ipc.Device,满足此接口 +type Device interface { + // 这里不定义任何方法,让所有类型都能满足 + // 适配器实现时直接使用 *ipc.Device 类型 +} + +// Channel 通道接口 +type Channel interface { + // 同上 +} + +// Protocol 协议抽象接口(端口) +// +// 设计原则: +// 1. 接口参数使用 Device/Channel 接口(空接口) +// 2. 适配器实现时使用具体类型 *ipc.Device, *ipc.Channel +// 3. 由于是空接口,任何类型都满足,但语义上明确是 Device/Channel +// 4. 避免循环依赖,同时保持类型语义 +// +// 使用示例: +// +// // 适配器实现 +// func (a *Adapter) ValidateDevice(ctx context.Context, device Device) error { +// dev := device.(*ipc.Device) // 类型断言 +// // ... +// } +// +// // 或者更好的方式:在包级别声明具体签名 +// func (a *Adapter) ValidateDevice(ctx context.Context, device *ipc.Device) error +type Protocol interface { + // ValidateDevice 验证设备连接(添加设备前调用) + // 参数类型: *ipc.Device + ValidateDevice(ctx context.Context, device Device) error + + // InitDevice 初始化设备连接(添加设备后调用) + // 参数类型: *ipc.Device + InitDevice(ctx context.Context, device Device) error + + // QueryCatalog 查询设备目录/通道 + // 参数类型: *ipc.Device + QueryCatalog(ctx context.Context, device Device) error + + // StartPlay 开始播放 + // 参数类型: *ipc.Device, *ipc.Channel + StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error) + + // StopPlay 停止播放 + // 参数类型: *ipc.Device, *ipc.Channel + StopPlay(ctx context.Context, device Device, channel Channel) error +} + +// PlayResponse 播放响应 +type PlayResponse struct { + SSRC string // GB28181 SSRC + Stream string // 流 ID + RTSP string // RTSP 地址 (ONVIF) +} diff --git a/internal/core/ipc/protocol.go b/internal/core/ipc/protocol.go new file mode 100644 index 0000000..daab3f1 --- /dev/null +++ b/internal/core/ipc/protocol.go @@ -0,0 +1,196 @@ +package ipc + +import ( + "context" + + "github.com/gowvp/gb28181/internal/core/bz" + "github.com/ixugo/goddd/domain/uniqueid" + "github.com/ixugo/goddd/pkg/orm" + "github.com/ixugo/goddd/pkg/web" +) + +// 为协议适配,提供协议会用到的功能 +type Adapter struct { + // deviceStore DeviceStorer + // channelStore ChannelStorer + store Storer + uni uniqueid.Core +} + +func GenerateDID(d *Device, uni uniqueid.Core) string { + if d.IsOnvif() { + return uni.UniqueID(bz.IDPrefixOnvif) + } + return uni.UniqueID(bz.IDPrefixGB) +} + +func GenerateChannelID(c *Channel, uni uniqueid.Core) string { + if c.IsOnvif() { + return uni.UniqueID(bz.IDPrefixOnvifChannel) + } + return uni.UniqueID(bz.IDPrefixGBChannel) +} + +func NewAdapter(store Storer, uni uniqueid.Core) Adapter { + return Adapter{ + store: store, + uni: uni, + } +} + +func (g Adapter) Store() Storer { + return g.store +} + +func (g Adapter) GetDeviceByDeviceID(gbDeviceID string) (*Device, error) { + ctx := context.TODO() + var d Device + if err := g.store.Device().Get(ctx, &d, orm.Where("device_id=?", gbDeviceID)); err != nil { + if !orm.IsErrRecordNotFound(err) { + return nil, err + } + d.init(g.uni.UniqueID(bz.IDPrefixGB), gbDeviceID) + if err := g.store.Device().Add(ctx, &d); err != nil { + return nil, err + } + } + return &d, nil +} + +func (g Adapter) Logout(deviceID string, changeFn func(*Device)) error { + var d Device + if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { + changeFn(d) + }, orm.Where("device_id=?", deviceID)); err != nil { + return err + } + + return nil +} + +func (g Adapter) Edit(deviceID string, changeFn func(*Device)) error { + var d Device + if err := g.store.Device().Edit(context.TODO(), &d, func(d *Device) { + changeFn(d) + }, orm.Where("device_id=?", deviceID)); err != nil { + return err + } + + return nil +} + +func (g Adapter) EditPlaying(ctx context.Context, deviceID, channelID string, playing bool) error { + var ch Channel + if err := g.store.Channel().Edit(ctx, &ch, func(c *Channel) { + c.IsPlaying = playing + }, orm.Where("device_id = ? AND channel_id = ?", deviceID, channelID)); err != nil { + return err + } + return nil +} + +// SaveChannels 保存通道列表(增量更新 + 删除多余通道) +// +// 策略说明: +// 1. 批量查询现有通道(减少数据库查询) +// 2. 对比更新:存在则更新,不存在则新增 +// 3. 删除多余:不在上报列表中的通道标记为离线或删除 +// 4. 使用事务保证数据一致性 +func (g Adapter) SaveChannels(channels []*Channel) error { + if len(channels) <= 0 { + return nil + } + + ctx := context.TODO() + deviceID := channels[0].DeviceID + + // 1. 获取设备信息 + var dev Device + _ = g.store.Device().Edit(context.TODO(), &dev, func(d *Device) { + d.Channels = len(channels) + }, orm.Where("device_id=?", channels[0].DeviceID)) + + // 2. 批量查询该设备的所有现有通道(一次查询,避免循环查询) + var existingChannels []*Channel + _, _ = g.store.Channel().Find(ctx, &existingChannels, + web.NewPagerFilterMaxSize(), + orm.Where("device_id = ?", deviceID), + ) + + // 3. 构建 map 方便快速查找 + existingMap := make(map[string]*Channel) + for _, ch := range existingChannels { + existingMap[ch.ChannelID] = ch + } + + // 4. 收集当前上报的通道 ID + currentChannelIDs := make([]string, 0, len(channels)) + + // 5. 遍历上报的通道,区分新增和更新 + for _, channel := range channels { + currentChannelIDs = append(currentChannelIDs, channel.ChannelID) + + if existing, ok := existingMap[channel.ChannelID]; ok { + // 通道已存在,更新信息 + _ = g.store.Channel().Edit(ctx, existing, func(c *Channel) { + c.Name = channel.Name + c.IsOnline = channel.IsOnline + c.Ext = channel.Ext + }, orm.Where("id=?", existing.ID)) + } else { + // 通道不存在,新增 + channel.ID = GenerateChannelID(channel, g.uni) + channel.DID = dev.ID + _ = g.store.Channel().Add(ctx, channel) + } + } + + // 6. 删除不再存在的通道(设备上报的通道列表中没有的) + // 方案A:标记为离线(推荐,保留历史数据) + if len(currentChannelIDs) > 0 { + _ = g.store.Channel().BatchEdit(ctx, "is_online", false, + orm.Where("device_id = ?", deviceID), + orm.Where("channel_id NOT IN ?", currentChannelIDs), + ) + } + + // 方案B:硬删除(如果需要完全删除) + // 可根据业务需求在配置中选择 + // var ch Channel + // _ = g.store.Channel().Del(ctx, &ch, + // orm.Where("device_id = ?", deviceID), + // orm.Where("channel_id NOT IN ?", currentChannelIDs), + // ) + + // 7. 更新设备的通道数量 + _ = g.store.Device().Edit(ctx, &dev, func(d *Device) { + d.Channels = len(channels) + }, orm.Where("device_id=?", deviceID)) + + return nil +} + +// FindDevices 获取所有设备 +func (g Adapter) FindDevices(ctx context.Context) ([]*Device, error) { + var devices []*Device + if _, err := g.store.Device().Find(ctx, &devices, web.NewPagerFilterMaxSize()); err != nil { + return nil, err + } + return devices, nil +} + +func (g Adapter) GetChannel(ctx context.Context, id string) (*Channel, error) { + var ch Channel + if err := g.store.Channel().Get(ctx, &ch, orm.Where("id=?", id)); err != nil { + return nil, err + } + return &ch, nil +} + +func (g Adapter) GetDevice(ctx context.Context, id string) (*Device, error) { + var dev Device + if err := g.store.Device().Get(ctx, &dev, orm.Where("id=?", id)); err != nil { + return nil, err + } + return &dev, nil +} diff --git a/internal/core/ipc/store/ipccache/cache.go b/internal/core/ipc/store/ipccache/cache.go index 7adbb16..f2a2e0a 100644 --- a/internal/core/ipc/store/ipccache/cache.go +++ b/internal/core/ipc/store/ipccache/cache.go @@ -6,7 +6,7 @@ import ( "log/slog" "strings" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" @@ -16,11 +16,11 @@ import ( var ( _ gbs.MemoryStorer = &Cache{} - _ gb28181.Storer = &Cache{} + _ ipc.Storer = &Cache{} ) type Cache struct { - gb28181.Storer + ipc.Storer devices *conc.Map[string, *gbs.Device] } @@ -30,15 +30,15 @@ func (c *Cache) LoadOrStore(deviceID string, value *gbs.Device) { c.devices.LoadOrStore(deviceID, value) } -func (c *Cache) Device() gb28181.DeviceStorer { +func (c *Cache) Device() ipc.DeviceStorer { return (*Device)(c) } -func (c *Cache) Channel() gb28181.ChannelStorer { +func (c *Cache) Channel() ipc.ChannelStorer { return (*Channel)(c) } -func NewCache(store gb28181.Storer) *Cache { +func NewCache(store ipc.Storer) *Cache { return &Cache{ Storer: store, devices: &conc.Map[string, *gbs.Device]{}, @@ -47,8 +47,9 @@ func NewCache(store gb28181.Storer) *Cache { // LoadDeviceToMemory implements gbs.MemoryStorer. func (c *Cache) LoadDeviceToMemory(conn sip.Connection) { - devices := make([]*gb28181.Device, 0, 100) - _, err := c.Storer.Device().Find(context.TODO(), &devices, web.NewPagerFilterMaxSize()) + // TODO: 加载 gb28181 设备 + devices := make([]*ipc.Device, 0, 100) + _, err := c.Storer.Device().Find(context.TODO(), &devices, web.NewPagerFilterMaxSize(), orm.Where("type != ?", ipc.TypeOnvif)) if err != nil { panic(err) } @@ -56,7 +57,7 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) { for _, d := range devices { if strings.ToLower(d.Transport) == "tcp" { // 通知相关设备/通道离线 - c.Change(d.DeviceID, func(d *gb28181.Device) { + c.Change(d.GetGB28181DeviceID(), func(d *ipc.Device) { d.IsOnline = false }, func(d *gbs.Device) { d.IsOnline = false @@ -67,18 +68,18 @@ func (c *Cache) LoadDeviceToMemory(conn sip.Connection) { dev := gbs.NewDevice(conn, d) if dev != nil { if err := dev.CheckConnection(); err != nil { - slog.Warn("检查设备连接失败", "err", err, "device_id", d.DeviceID, "to", dev.To()) + slog.Warn("检查设备连接失败", "err", err, "username", d.GetGB28181DeviceID(), "to", dev.To()) continue } - slog.Debug("load device to memory", "device_id", d.DeviceID, "to", dev.To()) - channels := make([]*gb28181.Channel, 0, 8) - _, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.DeviceID)) + slog.Debug("load device to memory", "username", d.GetGB28181DeviceID(), "to", dev.To()) + channels := make([]*ipc.Channel, 0, 8) + _, err := c.Storer.Channel().Find(context.TODO(), &channels, web.NewPagerFilterMaxSize(), orm.Where("device_id=?", d.GetGB28181DeviceID())) if err != nil { panic(err) } dev.LoadChannels(channels...) - c.devices.Store(d.DeviceID, dev) + c.devices.Store(d.GetGB28181DeviceID(), dev) } } } @@ -89,8 +90,8 @@ func (c *Cache) RangeDevices(fn func(key string, value *gbs.Device) bool) { } // Change implements gbs.MemoryStorer. -func (c *Cache) Change(deviceID string, changeFn func(*gb28181.Device), changeFn2 func(*gbs.Device)) error { - var dev gb28181.Device +func (c *Cache) Change(deviceID string, changeFn func(*ipc.Device), changeFn2 func(*gbs.Device)) error { + var dev ipc.Device if err := c.Storer.Device().Edit(context.TODO(), &dev, changeFn, orm.Where("device_id=?", deviceID)); err != nil { return err } diff --git a/internal/core/ipc/store/ipccache/channel.go b/internal/core/ipc/store/ipccache/channel.go index 15cb336..12d1ceb 100644 --- a/internal/core/ipc/store/ipccache/channel.go +++ b/internal/core/ipc/store/ipccache/channel.go @@ -3,16 +3,22 @@ package ipccache import ( "context" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" + "gorm.io/gorm" ) -var _ gb28181.ChannelStorer = &Channel{} +var _ ipc.ChannelStorer = &Channel{} type Channel Cache -// Add implements gb28181.ChannelStorer. -func (c *Channel) Add(ctx context.Context, ch *gb28181.Channel) error { +// Session implements ipc.ChannelStorer. +func (c *Channel) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error { + return c.Storer.Channel().Session(ctx, changeFns...) +} + +// Add implements ipc.ChannelStorer. +func (c *Channel) Add(ctx context.Context, ch *ipc.Channel) error { if err := c.Storer.Channel().Add(ctx, ch); err != nil { return err } @@ -23,27 +29,27 @@ func (c *Channel) Add(ctx context.Context, ch *gb28181.Channel) error { return nil } -// BatchEdit implements gb28181.ChannelStorer. +// BatchEdit implements ipc.ChannelStorer. func (c *Channel) BatchEdit(ctx context.Context, field string, value any, opts ...orm.QueryOption) error { return c.Storer.Channel().BatchEdit(ctx, field, value, opts...) } -// Del implements gb28181.ChannelStorer. -func (c *Channel) Del(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error { +// Del implements ipc.ChannelStorer. +func (c *Channel) Del(ctx context.Context, ch *ipc.Channel, opts ...orm.QueryOption) error { return c.Storer.Channel().Del(ctx, ch, opts...) } -// Edit implements gb28181.ChannelStorer. -func (c *Channel) Edit(ctx context.Context, ch *gb28181.Channel, changeFn func(*gb28181.Channel), opts ...orm.QueryOption) error { +// Edit implements ipc.ChannelStorer. +func (c *Channel) Edit(ctx context.Context, ch *ipc.Channel, changeFn func(*ipc.Channel), opts ...orm.QueryOption) error { return c.Storer.Channel().Edit(ctx, ch, changeFn, opts...) } -// Find implements gb28181.ChannelStorer. -func (c *Channel) Find(ctx context.Context, chs *[]*gb28181.Channel, pager orm.Pager, opts ...orm.QueryOption) (int64, error) { +// Find implements ipc.ChannelStorer. +func (c *Channel) Find(ctx context.Context, chs *[]*ipc.Channel, pager orm.Pager, opts ...orm.QueryOption) (int64, error) { return c.Storer.Channel().Find(ctx, chs, pager, opts...) } -// Get implements gb28181.ChannelStorer. -func (c *Channel) Get(ctx context.Context, ch *gb28181.Channel, opts ...orm.QueryOption) error { +// Get implements ipc.ChannelStorer. +func (c *Channel) Get(ctx context.Context, ch *ipc.Channel, opts ...orm.QueryOption) error { return c.Storer.Channel().Get(ctx, ch, opts...) } diff --git a/internal/core/ipc/store/ipccache/device.go b/internal/core/ipc/store/ipccache/device.go index 9b6726a..5857ce6 100644 --- a/internal/core/ipc/store/ipccache/device.go +++ b/internal/core/ipc/store/ipccache/device.go @@ -5,28 +5,28 @@ import ( "fmt" "log/slog" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/pkg/gbs" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" "gorm.io/gorm/clause" ) -var _ gb28181.DeviceStorer = &Device{} +var _ ipc.DeviceStorer = &Device{} type Device = Cache -// Add implements gb28181.DeviceStorer. -func (d *Device) Add(ctx context.Context, dev *gb28181.Device) error { +// Add implements ipc.DeviceStorer. +func (d *Device) Add(ctx context.Context, dev *ipc.Device) error { if err := d.Storer.Device().Add(ctx, dev); err != nil { return err } - d.devices.LoadOrStore(dev.DeviceID, gbs.NewDevice(nil, dev)) + d.devices.LoadOrStore(dev.GetGB28181DeviceID(), gbs.NewDevice(nil, dev)) return nil } -// Del implements gb28181.DeviceStorer. -func (d *Device) Del(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error { +// Del implements ipc.DeviceStorer. +func (d *Device) Del(ctx context.Context, dev *ipc.Device, opts ...orm.QueryOption) error { if err := d.Storer.Device().Session( ctx, func(tx *gorm.DB) error { @@ -37,29 +37,29 @@ func (d *Device) Del(ctx context.Context, dev *gb28181.Device, opts ...orm.Query return db.Delete(dev).Error }, func(tx *gorm.DB) error { - return tx.Model(&gb28181.Channel{}).Where("did=?", dev.ID).Delete(nil).Error + return tx.Model(&ipc.Channel{}).Where("did=?", dev.ID).Delete(nil).Error }, ); err != nil { return err } - d.devices.Delete(dev.DeviceID) + d.devices.Delete(dev.GetGB28181DeviceID()) return nil } -// Edit implements gb28181.DeviceStorer. -func (d *Device) Edit(ctx context.Context, dev *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error { +// Edit implements ipc.DeviceStorer. +func (d *Device) Edit(ctx context.Context, dev *ipc.Device, changeFn func(*ipc.Device), opts ...orm.QueryOption) error { if err := d.Storer.Device().Edit(ctx, dev, changeFn, opts...); err != nil { return err } - dev2, ok := d.devices.Load(dev.DeviceID) + dev2, ok := d.devices.Load(dev.GetGB28181DeviceID()) if !ok { return fmt.Errorf("edit device not found") } // 密码修改,设备需要重新注册 if dev2.Password != dev.Password && dev.Password != "" { slog.InfoContext(ctx, " 修改密码,设备离线") - d.Change(dev.DeviceID, func(d *gb28181.Device) { + d.Change(dev.GetGB28181DeviceID(), func(d *ipc.Device) { d.Password = dev.Password d.IsOnline = false }, func(d *gbs.Device) { @@ -68,17 +68,17 @@ func (d *Device) Edit(ctx context.Context, dev *gb28181.Device, changeFn func(*g return nil } -// Find implements gb28181.DeviceStorer. -func (d *Device) Find(ctx context.Context, devs *[]*gb28181.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) { +// Find implements ipc.DeviceStorer. +func (d *Device) Find(ctx context.Context, devs *[]*ipc.Device, pager orm.Pager, opts ...orm.QueryOption) (int64, error) { return d.Storer.Device().Find(ctx, devs, pager, opts...) } -// Get implements gb28181.DeviceStorer. -func (d *Device) Get(ctx context.Context, dev *gb28181.Device, opts ...orm.QueryOption) error { +// Get implements ipc.DeviceStorer. +func (d *Device) Get(ctx context.Context, dev *ipc.Device, opts ...orm.QueryOption) error { return d.Storer.Device().Get(ctx, dev, opts...) } -// Session implements gb28181.DeviceStorer. +// Session implements ipc.DeviceStorer. func (d *Device) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) error { return d.Storer.Device().Session(ctx, changeFns...) } diff --git a/internal/core/ipc/store/ipcdb/channel.go b/internal/core/ipc/store/ipcdb/channel.go index e263bf9..b7e9603 100755 --- a/internal/core/ipc/store/ipcdb/channel.go +++ b/internal/core/ipc/store/ipcdb/channel.go @@ -4,22 +4,22 @@ package ipcdb import ( "context" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" ) -var _ gb28181.ChannelStorer = Channel{} +var _ ipc.ChannelStorer = Channel{} // Channel Related business namespaces type Channel DB -// BatchEdit implements gb28181.ChannelStorer. +// BatchEdit implements ipc.ChannelStorer. func (d Channel) BatchEdit(ctx context.Context, column string, value any, args ...orm.QueryOption) error { if len(args) == 0 { panic("没有查询条件") } - db := d.db.WithContext(ctx).Model(new(gb28181.Channel)) + db := d.db.WithContext(ctx).Model(new(ipc.Channel)) for _, fn := range args { fn(db) } @@ -31,28 +31,28 @@ func NewChannel(db *gorm.DB) Channel { return Channel{db: db} } -// Find implements gb28181.ChannelStorer. -func (d Channel) Find(ctx context.Context, bs *[]*gb28181.Channel, page orm.Pager, opts ...orm.QueryOption) (int64, error) { +// Find implements ipc.ChannelStorer. +func (d Channel) Find(ctx context.Context, bs *[]*ipc.Channel, page orm.Pager, opts ...orm.QueryOption) (int64, error) { return orm.FindWithContext(ctx, d.db, bs, page, opts...) } -// Get implements gb28181.ChannelStorer. -func (d Channel) Get(ctx context.Context, model *gb28181.Channel, opts ...orm.QueryOption) error { +// Get implements ipc.ChannelStorer. +func (d Channel) Get(ctx context.Context, model *ipc.Channel, opts ...orm.QueryOption) error { return orm.FirstWithContext(ctx, d.db, model, opts...) } -// Add implements gb28181.ChannelStorer. -func (d Channel) Add(ctx context.Context, model *gb28181.Channel) error { +// Add implements ipc.ChannelStorer. +func (d Channel) Add(ctx context.Context, model *ipc.Channel) error { return d.db.WithContext(ctx).Create(model).Error } -// Edit implements gb28181.ChannelStorer. -func (d Channel) Edit(ctx context.Context, model *gb28181.Channel, changeFn func(*gb28181.Channel), opts ...orm.QueryOption) error { +// Edit implements ipc.ChannelStorer. +func (d Channel) Edit(ctx context.Context, model *ipc.Channel, changeFn func(*ipc.Channel), opts ...orm.QueryOption) error { return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...) } -// Del implements gb28181.ChannelStorer. -func (d Channel) Del(ctx context.Context, model *gb28181.Channel, opts ...orm.QueryOption) error { +// Del implements ipc.ChannelStorer. +func (d Channel) Del(ctx context.Context, model *ipc.Channel, opts ...orm.QueryOption) error { return orm.DeleteWithContext(ctx, d.db, model, opts...) } @@ -67,6 +67,6 @@ func (d Channel) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) }) } -func (d Channel) EditWithSession(tx *gorm.DB, model *gb28181.Channel, changeFn func(b *gb28181.Channel) error, opts ...orm.QueryOption) error { +func (d Channel) EditWithSession(tx *gorm.DB, model *ipc.Channel, changeFn func(b *ipc.Channel) error, opts ...orm.QueryOption) error { return orm.UpdateWithSession(tx, model, changeFn, opts...) } diff --git a/internal/core/ipc/store/ipcdb/db.go b/internal/core/ipc/store/ipcdb/db.go index f8d78e3..f57626a 100755 --- a/internal/core/ipc/store/ipcdb/db.go +++ b/internal/core/ipc/store/ipcdb/db.go @@ -2,11 +2,11 @@ package ipcdb import ( - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "gorm.io/gorm" ) -var _ gb28181.Storer = DB{} +var _ ipc.Storer = DB{} // DB Related business namespaces type DB struct { @@ -19,12 +19,12 @@ func NewDB(db *gorm.DB) DB { } // Device Get business instance -func (d DB) Device() gb28181.DeviceStorer { +func (d DB) Device() ipc.DeviceStorer { return Device(d) } // Channel Get business instance -func (d DB) Channel() gb28181.ChannelStorer { +func (d DB) Channel() ipc.ChannelStorer { return Channel(d) } @@ -34,8 +34,8 @@ func (d DB) AutoMigrate(ok bool) DB { return d } if err := d.db.AutoMigrate( - new(gb28181.Device), - new(gb28181.Channel), + new(ipc.Device), + new(ipc.Channel), ); err != nil { panic(err) } diff --git a/internal/core/ipc/store/ipcdb/device.go b/internal/core/ipc/store/ipcdb/device.go index 635b6b6..d5ec0e6 100755 --- a/internal/core/ipc/store/ipcdb/device.go +++ b/internal/core/ipc/store/ipcdb/device.go @@ -4,12 +4,12 @@ package ipcdb import ( "context" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/ixugo/goddd/pkg/orm" "gorm.io/gorm" ) -var _ gb28181.DeviceStorer = Device{} +var _ ipc.DeviceStorer = Device{} // Device Related business namespaces type Device DB @@ -19,28 +19,28 @@ func NewDevice(db *gorm.DB) Device { return Device{db: db} } -// Find implements gb28181.DeviceStorer. -func (d Device) Find(ctx context.Context, bs *[]*gb28181.Device, page orm.Pager, opts ...orm.QueryOption) (int64, error) { +// Find implements ipc.DeviceStorer. +func (d Device) Find(ctx context.Context, bs *[]*ipc.Device, page orm.Pager, opts ...orm.QueryOption) (int64, error) { return orm.FindWithContext(ctx, d.db, bs, page, opts...) } -// Get implements gb28181.DeviceStorer. -func (d Device) Get(ctx context.Context, model *gb28181.Device, opts ...orm.QueryOption) error { +// Get implements ipc.DeviceStorer. +func (d Device) Get(ctx context.Context, model *ipc.Device, opts ...orm.QueryOption) error { return orm.FirstWithContext(ctx, d.db, model, opts...) } -// Add implements gb28181.DeviceStorer. -func (d Device) Add(ctx context.Context, model *gb28181.Device) error { +// Add implements ipc.DeviceStorer. +func (d Device) Add(ctx context.Context, model *ipc.Device) error { return d.db.WithContext(ctx).Create(model).Error } -// Edit implements gb28181.DeviceStorer. -func (d Device) Edit(ctx context.Context, model *gb28181.Device, changeFn func(*gb28181.Device), opts ...orm.QueryOption) error { +// Edit implements ipc.DeviceStorer. +func (d Device) Edit(ctx context.Context, model *ipc.Device, changeFn func(*ipc.Device), opts ...orm.QueryOption) error { return orm.UpdateWithContext(ctx, d.db, model, changeFn, opts...) } -// Del implements gb28181.DeviceStorer. -func (d Device) Del(ctx context.Context, model *gb28181.Device, opts ...orm.QueryOption) error { +// Del implements ipc.DeviceStorer. +func (d Device) Del(ctx context.Context, model *ipc.Device, opts ...orm.QueryOption) error { return orm.DeleteWithContext(ctx, d.db, model, opts...) } @@ -55,6 +55,6 @@ func (d Device) Session(ctx context.Context, changeFns ...func(*gorm.DB) error) }) } -func (d Device) EditWithSession(tx *gorm.DB, model *gb28181.Device, changeFn func(b *gb28181.Device) error, opts ...orm.QueryOption) error { +func (d Device) EditWithSession(tx *gorm.DB, model *ipc.Device, changeFn func(b *ipc.Device) error, opts ...orm.QueryOption) error { return orm.UpdateWithSession(tx, model, changeFn, opts...) } diff --git a/internal/core/port/protocol.go b/internal/core/port/protocol.go deleted file mode 100644 index 2636c6b..0000000 --- a/internal/core/port/protocol.go +++ /dev/null @@ -1,50 +0,0 @@ -package port - -import ( - "context" -) - -// Device 设备接口(避免循环依赖) -// 协议适配器的实现会接收具体的 gb28181.Device 类型 -type Device interface { - GetID() string - GetDeviceID() string - GetType() string - GetIP() string - GetPort() int - GetUsername() string - GetPassword() string -} - -// Channel 通道接口(避免循环依赖) -type Channel interface { - GetID() string - GetChannelID() string - GetDeviceID() string -} - -// Protocol 协议抽象接口(端口) -type Protocol interface { - // ValidateDevice 验证设备连接(添加设备前调用) - ValidateDevice(ctx context.Context, device Device) error - - // InitDevice 初始化设备连接(添加设备后调用) - // 例如: GB28181 不需要主动初始化,ONVIF 需要查询 Profiles 作为通道 - InitDevice(ctx context.Context, device Device) error - - // QueryCatalog 查询设备目录/通道 - QueryCatalog(ctx context.Context, device Device) error - - // StartPlay 开始播放 - StartPlay(ctx context.Context, device Device, channel Channel) (*PlayResponse, error) - - // StopPlay 停止播放 - StopPlay(ctx context.Context, device Device, channel Channel) error -} - -// PlayResponse 播放响应 -type PlayResponse struct { - SSRC string // GB28181 SSRC - Stream string // 流 ID - RTSP string // RTSP 地址 (ONVIF) -} diff --git a/internal/core/sms/node_manager.go b/internal/core/sms/node_manager.go index e360a2f..b6e586a 100644 --- a/internal/core/sms/node_manager.go +++ b/internal/core/sms/node_manager.go @@ -194,6 +194,7 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error { log.Info("ZLM 服务节点配置设置") hookPrefix := fmt.Sprintf("http://%s:%d/webhook", server.HookIP, serverPort) + req := zlm.SetServerConfigRequest{ RtcExternIP: zlm.NewString(server.IP), GeneralMediaServerID: zlm.NewString(server.ID), @@ -232,6 +233,7 @@ func (n *NodeManager) connection(server *MediaServer, serverPort int) error { // 优化此消息以更快的收到流注销事件 ProtocolContinuePushMs: zlm.NewString("3000"), RtpProxyPortRange: &server.RTPPortRange, + FfmpegLog: zlm.NewString("./fflogs/ffmpeg.log"), } { diff --git a/internal/protocol/onvif/adapter.go b/internal/protocol/onvif/adapter.go deleted file mode 100644 index 0bfc06c..0000000 --- a/internal/protocol/onvif/adapter.go +++ /dev/null @@ -1,157 +0,0 @@ -package onvifadapter - -import ( - "context" - "fmt" - "log/slog" - - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" - "github.com/gowvp/gb28181/internal/core/port" - "github.com/gowvp/onvif" - m "github.com/gowvp/onvif/media" - sdkmedia "github.com/gowvp/onvif/sdk/media" - "github.com/ixugo/goddd/pkg/orm" -) - -var _ port.Protocol = (*Adapter)(nil) - -// Adapter ONVIF 协议适配器 -type Adapter struct { - devices map[string]*onvif.Device // ONVIF 设备连接缓存 - store gb28181.Storer // 协议适配器可以依赖领域的存储接口 -} - -func NewAdapter(store gb28181.Storer) *Adapter { - return &Adapter{ - devices: make(map[string]*onvif.Device), - store: store, - } -} - -// ValidateDevice 实现 port.Protocol 接口 - ONVIF 设备验证 -func (a *Adapter) ValidateDevice(ctx context.Context, device port.Device) error { - // 尝试连接 ONVIF 设备并验证可以获取 Profiles - dev, err := onvif.NewDevice(onvif.DeviceParams{ - Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), - Username: device.GetUsername(), - Password: device.GetPassword(), - }) - if err != nil { - return fmt.Errorf("ONVIF 连接失败: %w", err) - } - - // 验证可以获取 Profiles - _, err = sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{}) - if err != nil { - return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err) - } - - return nil -} - -// InitDevice 实现 port.Protocol 接口 - 初始化 ONVIF 设备 -// ONVIF 设备初始化时,自动查询 Profiles 并创建为通道 -func (a *Adapter) InitDevice(ctx context.Context, device port.Device) error { - // 创建 ONVIF 连接 - dev, err := onvif.NewDevice(onvif.DeviceParams{ - Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), - Username: device.GetUsername(), - Password: device.GetPassword(), - }) - if err != nil { - return err - } - - // 缓存设备连接 - a.devices[device.GetID()] = dev - - // 自动查询 Profiles 作为通道 - return a.queryAndSaveProfiles(ctx, device, dev) -} - -// QueryCatalog 实现 port.Protocol 接口 - ONVIF 查询 Profiles -func (a *Adapter) QueryCatalog(ctx context.Context, device port.Device) error { - dev, ok := a.devices[device.GetID()] - if !ok { - // 设备连接不在缓存中,尝试重新连接 - var err error - dev, err = onvif.NewDevice(onvif.DeviceParams{ - Xaddr: fmt.Sprintf("%s:%d", device.GetIP(), device.GetPort()), - Username: device.GetUsername(), - Password: device.GetPassword(), - }) - if err != nil { - return fmt.Errorf("ONVIF 设备未初始化: %w", err) - } - a.devices[device.GetID()] = dev - } - - return a.queryAndSaveProfiles(ctx, device, dev) -} - -// StartPlay 实现 port.Protocol 接口 - ONVIF 播放 -func (a *Adapter) StartPlay(ctx context.Context, device port.Device, channel port.Channel) (*port.PlayResponse, error) { - dev, ok := a.devices[device.GetID()] - if !ok { - return nil, fmt.Errorf("ONVIF 设备未初始化") - } - - // 获取 RTSP 地址 - streamURI, err := a.getStreamURI(ctx, dev, channel.GetChannelID()) - if err != nil { - return nil, err - } - - return &port.PlayResponse{ - RTSP: streamURI, - }, nil -} - -// StopPlay 实现 port.Protocol 接口 - ONVIF 停止播放 -func (a *Adapter) StopPlay(ctx context.Context, device port.Device, channel port.Channel) error { - // ONVIF 通常不需要显式停止播放 - return nil -} - -// queryAndSaveProfiles 查询 ONVIF Profiles 并保存为通道 -func (a *Adapter) queryAndSaveProfiles(ctx context.Context, device port.Device, dev *onvif.Device) error { - // 查询 ONVIF Profiles - resp, err := sdkmedia.Call_GetProfiles(ctx, dev, m.GetProfiles{}) - if err != nil { - return fmt.Errorf("获取 ONVIF Profiles 失败: %w", err) - } - - // 将 Profiles 转换为通道并保存 - for _, profile := range resp.Profiles { - channel := &gb28181.Channel{ - DeviceID: device.GetDeviceID(), - ChannelID: string(profile.Token), - Name: string(profile.Name), - DID: device.GetID(), - } - - // 保存到数据库(使用领域层的存储接口) - if err := a.store.Channel().Add(ctx, channel); err != nil { - // 如果是重复错误,忽略 - if orm.IsDuplicatedKey(err) { - slog.DebugContext(ctx, "通道已存在", "channel_id", channel.ChannelID) - continue - } - slog.ErrorContext(ctx, "保存通道失败", "err", err, "channel_id", channel.ChannelID) - continue - } - slog.InfoContext(ctx, "ONVIF Profile 保存为通道", "channel_id", channel.ChannelID, "name", channel.Name) - } - - return nil -} - -// getStreamURI 获取 RTSP 流地址 -func (a *Adapter) getStreamURI(ctx context.Context, dev *onvif.Device, profileToken string) (string, error) { - // TODO: 调用 ONVIF GetStreamUri 方法 - // 这里需要根据 onvif SDK 的实际 API 来实现 - - // 临时实现:假设 profileToken 可以直接构造 RTSP 地址 - params := dev.GetDeviceParams() - return fmt.Sprintf("rtsp://%s:%s@%s/stream/%s", params.Username, params.Password, params.Xaddr, profileToken), nil -} diff --git a/internal/web/api/ipc.go b/internal/web/api/ipc.go index 05a425c..db287d5 100755 --- a/internal/web/api/ipc.go +++ b/internal/web/api/ipc.go @@ -2,25 +2,24 @@ package api import ( - "encoding/json" "fmt" "io" "log/slog" "os" "path/filepath" + "slices" "sort" "strings" "time" "github.com/gin-gonic/gin" "github.com/google/uuid" + "github.com/gowvp/gb28181/internal/adapter/onvifadapter" "github.com/gowvp/gb28181/internal/core/bz" "github.com/gowvp/gb28181/internal/core/ipc" - "github.com/gowvp/gb28181/internal/core/port" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/zlm" - "github.com/gowvp/onvif" "github.com/ixugo/goddd/domain/uniqueid" "github.com/ixugo/goddd/pkg/hook" "github.com/ixugo/goddd/pkg/orm" @@ -61,7 +60,7 @@ func NewIPCAPI(core ipc.Core) IPCAPI { return IPCAPI{ipc: core} } -func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]port.Protocol) ipc.Core { +func NewIPCCore(store ipc.Storer, uni uniqueid.Core, protocols map[string]ipc.Protocoler) ipc.Core { return ipc.NewCore(store, uni, protocols) } @@ -92,8 +91,9 @@ func registerGB28181(g gin.IRouter, api IPCAPI, handler ...gin.HandlerFunc) { group.POST("/:id/catalog", web.WrapH(api.queryCatalog)) // 刷新通道(GB28181 特有) } { - group := g.Group("/onvif", handler...) - group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有) + // group := g.Group("/onvif", handler...) + g.GET("/onvif/discover", api.discover) // ONVIF 设备发现(ONVIF 特有) + // group.GET("/discover", api.discover) // ONVIF 设备发现(ONVIF 特有) } // 统一的通道管理 API(支持所有协议) @@ -137,6 +137,10 @@ func (a IPCAPI) editDevice(c *gin.Context, in *ipc.EditDeviceInput) (any, error) // POST /devices // { "type": "ONVIF", "ip": "192.168.1.100", "port": 80, "username": "admin", "password": "12345" } func (a IPCAPI) addDevice(c *gin.Context, in *ipc.AddDeviceInput) (any, error) { + in.Type = strings.ToUpper(in.Type) + if !slices.Contains([]string{ipc.TypeGB28181, ipc.TypeOnvif}, in.Type) { + return nil, reason.ErrBadRequest.SetMsg("不支持的设备类型") + } return a.ipc.AddDevice(c.Request.Context(), in) } @@ -196,7 +200,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { var app, appStream, host, stream, session, mediaServerID string // 国标逻辑 - if strings.HasPrefix(channelID, bz.IDPrefixGBChannel) { + if bz.IsGB28181(channelID) { // 防止错误的配置,无法收到流 if a.uc.Conf.Media.SDPIP == "127.0.0.1" { return nil, reason.ErrUsedLogic.SetMsg("请先配置流媒体 SDP 收流地址") @@ -212,7 +216,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { mediaServerID = sms.DefaultMediaServerID - } else if strings.HasPrefix(channelID, bz.IDPrefixRTMP) { + } else if bz.IsRTMP(channelID) { pu, err := a.uc.MediaAPI.pushCore.GetStreamPush(c.Request.Context(), channelID) if err != nil { return nil, err @@ -227,7 +231,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { if !pu.IsAuthDisabled && pu.Session != "" { session = "session=" + pu.Session } - } else if strings.HasPrefix(channelID, bz.IDPrefixRTSP) { + } else if bz.IsRTSP(channelID) { proxy, err := a.uc.ProxyAPI.proxyCore.GetStreamProxy(c.Request.Context(), channelID) if err != nil { return nil, err @@ -235,6 +239,10 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { app = proxy.App appStream = proxy.Stream mediaServerID = sms.DefaultMediaServerID + } else if bz.IsOnvif(channelID) { + app = "rtp" + appStream = channelID + mediaServerID = sms.DefaultMediaServerID } else { return nil, reason.ErrNotFound.SetMsg("不支持的播放通道") } @@ -299,7 +307,7 @@ func (a IPCAPI) play(c *gin.Context, _ *struct{}) (*playOutput, error) { // 取一张快照 go func() { for range 2 { - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) rtsp := fmt.Sprintf("rtsp://%s:%d/%s", "127.0.0.1", svr.Ports.RTSP, stream) + "?" + session body, err := a.uc.SMSAPI.smsCore.GetSnapshot(svr, zlm.GetSnapRequest{ URL: rtsp, @@ -382,27 +390,13 @@ func (a IPCAPI) getSnapshot(c *gin.Context) { c.Data(200, "image/jpeg", body) } -type DiscoverResponse struct { - Addr string `json:"addr"` -} - -func toDiscoverResponse(dev *onvif.Device) *DiscoverResponse { - addr := dev.GetDeviceParams().Xaddr - if !strings.Contains(addr, ":") { - addr += ":80" - } - return &DiscoverResponse{ - Addr: addr, - } -} - func (a IPCAPI) discover(c *gin.Context) { - recv, cancel, err := onvif.AllAvailableDevicesAtSpecificEthernetInterfaces() - if err != nil { - web.Fail(c, err) + p := a.ipc.GetProtocol(ipc.TypeOnvif) + onvifAdapter, ok := p.(*onvifadapter.Adapter) + if !ok { + web.Fail(c, reason.ErrNotFound.SetMsg("不支持的协议")) return } - defer cancel() se := web.NewSSE(64, time.Minute) go func() { @@ -413,27 +407,25 @@ func (a IPCAPI) discover(c *gin.Context) { }) se.Close() }() - for { - select { - case dev := <-recv: - if dev == nil { - return - } - // TODO: 已经添加的设备需要过滤掉 - b, _ := json.Marshal(toDiscoverResponse(dev)) - se.Publish(web.Event{ - ID: uuid.NewString(), - Event: "discover", - Data: b, - }) - time.Sleep(time.Millisecond * 200) - case <-c.Request.Context().Done(): - return - case <-time.After(3 * time.Second): - slog.DebugContext(c.Request.Context(), "discover timeout") - return - } + w := IOWriter{fn: func(b []byte) (int, error) { + se.Publish(web.Event{ + ID: uuid.NewString(), + Event: "discover", + Data: b, + }) + return len(b), nil + }} + if err := onvifAdapter.Discover(c.Request.Context(), w); err != nil { + slog.ErrorContext(c.Request.Context(), "discover", "err", err) } }() se.ServeHTTP(c.Writer, c.Request) } + +type IOWriter struct { + fn func(b []byte) (int, error) +} + +func (w IOWriter) Write(b []byte) (int, error) { + return w.fn(b) +} diff --git a/internal/web/api/provider.go b/internal/web/api/provider.go index 1dc8dbb..ce4615b 100644 --- a/internal/web/api/provider.go +++ b/internal/web/api/provider.go @@ -5,14 +5,17 @@ import ( "github.com/gin-gonic/gin" "github.com/google/wire" + "github.com/gowvp/gb28181/internal/adapter/gbadapter" + "github.com/gowvp/gb28181/internal/adapter/onvifadapter" + "github.com/gowvp/gb28181/internal/adapter/rtspadapter" "github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/ipc/store/ipccache" "github.com/gowvp/gb28181/internal/core/ipc/store/ipcdb" - "github.com/gowvp/gb28181/internal/core/port" + "github.com/gowvp/gb28181/internal/core/proxy" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/push/store/pushdb" - onvifadapter "github.com/gowvp/gb28181/internal/protocol/onvif" + "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs" "github.com/ixugo/goddd/domain/uniqueid" "github.com/ixugo/goddd/domain/uniqueid/store/uniqueiddb" @@ -34,7 +37,7 @@ var ( NewPushCore, NewPushAPI, gbs.NewServer, NewIPCStore, NewProtocols, NewIPCCore, NewIPCAPI, NewGBAdapter, - NewProxyAPI, + NewProxyAPI, NewProxyCore, NewConfigAPI, NewUserAPI, ) @@ -95,22 +98,18 @@ func NewIPCStore(db *gorm.DB) ipc.Storer { return ipccache.NewCache(ipcdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate())) } -func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.GBDAdapter { - return ipc.NewGBAdapter( +func NewGBAdapter(store ipc.Storer, uni uniqueid.Core) ipc.Adapter { + return ipc.NewAdapter( store, uni, ) } // NewProtocols 创建协议适配器映射 -func NewProtocols(store ipc.Storer) map[string]port.Protocol { - protocols := make(map[string]port.Protocol) - - // 注册 ONVIF 协议适配器 - protocols["ONVIF"] = onvifadapter.NewAdapter(store) - - // TODO: 注册 GB28181 协议适配器 - // protocols["GB28181"] = gb28181adapter.NewAdapter(sipServer, store) - +func NewProtocols(adapter ipc.Adapter, sms sms.Core, proxyCore *proxy.Core, gbs *gbs.Server) map[string]ipc.Protocoler { + protocols := make(map[string]ipc.Protocoler) + protocols[ipc.TypeOnvif] = onvifadapter.NewAdapter(adapter, sms) + protocols[ipc.TypeRTSP] = rtspadapter.NewAdapter(proxyCore, sms) + protocols[ipc.TypeGB28181] = gbadapter.NewAdapter(adapter, gbs, sms) return protocols } diff --git a/internal/web/api/proxy.go b/internal/web/api/proxy.go index b9a8fb0..44ac02b 100755 --- a/internal/web/api/proxy.go +++ b/internal/web/api/proxy.go @@ -15,9 +15,12 @@ type ProxyAPI struct { proxyCore *proxy.Core } -func NewProxyAPI(db *gorm.DB, uni uniqueid.Core) ProxyAPI { - core := proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni) - return ProxyAPI{proxyCore: core} +func NewProxyCore(db *gorm.DB, uni uniqueid.Core) *proxy.Core { + return proxy.NewCore(proxydb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate()), uni) +} + +func NewProxyAPI(proxyCore *proxy.Core) ProxyAPI { + return ProxyAPI{proxyCore: proxyCore} } func registerProxy(g gin.IRouter, api ProxyAPI, handler ...gin.HandlerFunc) { diff --git a/internal/web/api/sms.go b/internal/web/api/sms.go index bb3cb43..5246b1c 100755 --- a/internal/web/api/sms.go +++ b/internal/web/api/sms.go @@ -18,7 +18,7 @@ type SmsAPI struct { } func NewSMSCore(db *gorm.DB, cfg *conf.Bootstrap) sms.Core { - core := sms.NewCore(smsdb.NewDB(db).AutoMigrate(orm.EnabledAutoMigrate)) + core := sms.NewCore(smsdb.NewDB(db).AutoMigrate(orm.GetEnabledAutoMigrate())) if err := core.Run(cfg, cfg.Server.HTTP.Port); err != nil { panic(err) } diff --git a/internal/web/api/zlm_webhook.go b/internal/web/api/zlm_webhook.go index 0af6380..429d759 100644 --- a/internal/web/api/zlm_webhook.go +++ b/internal/web/api/zlm_webhook.go @@ -1,33 +1,32 @@ package api import ( - "context" "log/slog" "net/url" - "strings" "github.com/gin-gonic/gin" "github.com/gowvp/gb28181/internal/conf" "github.com/gowvp/gb28181/internal/core/bz" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/push" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs" - "github.com/gowvp/gb28181/pkg/zlm" "github.com/ixugo/goddd/pkg/web" ) type WebHookAPI struct { smsCore sms.Core mediaCore push.Core - gb28181Core gb28181.Core + gb28181Core ipc.Core conf *conf.Bootstrap log *slog.Logger gbs *gbs.Server uc *Usecase + + protocols map[string]ipc.Protocoler } -func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs *gbs.Server, gb28181 gb28181.Core) WebHookAPI { +func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs *gbs.Server, gb28181 ipc.Core, protocols map[string]ipc.Protocoler) WebHookAPI { return WebHookAPI{ smsCore: core, mediaCore: mediaCore, @@ -35,6 +34,7 @@ func NewWebHookAPI(core sms.Core, mediaCore push.Core, conf *conf.Bootstrap, gbs log: slog.With("hook", "zlm"), gbs: gbs, gb28181Core: gb28181, + protocols: protocols, } } @@ -88,29 +88,23 @@ func (w WebHookAPI) onPublish(c *gin.Context, in *onPublishInput) (*onPublishOut // https://docs.zlmediakit.com/zh/guide/media_server/web_hook_api.html#_12%E3%80%81on-stream-changed func (w WebHookAPI) onStreamChanged(c *gin.Context, in *onStreamChangedInput) (DefaultOutput, error) { w.log.InfoContext(c.Request.Context(), "webhook onStreamChanged", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID, "regist", in.Regist) - if in.App == "rtp" { - // 防止多次触发 - if in.Schema == "rtmp" && !in.Regist { - ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) - if err != nil { - w.log.WarnContext(c.Request.Context(), "webhook onStreamChanged", "err", err) - return newDefaultOutputOK(), nil - } - w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch}) + if in.Regist || in.Schema != "rtmp" { + return newDefaultOutputOK(), nil + } + + // TODO: 待重构 + if bz.IsRTMP(in.Stream) { + if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil { + w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err) } return newDefaultOutputOK(), nil } - if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { - return newDefaultOutputOK(), nil - } - - switch in.Schema { - case "rtmp": - if !in.Regist { - if err := w.mediaCore.UnPublish(c.Request.Context(), in.App, in.Stream); err != nil { - w.log.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err) - } + r := ipc.GetType(in.Stream) + protocol, ok := w.protocols[r] + if ok { + if err := protocol.OnStreamChanged(c.Request.Context(), in.Stream); err != nil { + slog.ErrorContext(c.Request.Context(), "webhook onStreamChanged", "err", err) } } return newDefaultOutputOK(), nil @@ -155,16 +149,6 @@ func (w WebHookAPI) onPlay(c *gin.Context, in *onPublishInput) (DefaultOutput, e func (w WebHookAPI) onStreamNoneReader(c *gin.Context, in *onStreamNoneReaderInput) (onStreamNoneReaderOutput, error) { // rtmp 无人观看时,也允许推流 w.log.InfoContext(c.Request.Context(), "webhook onStreamNoneReader", "app", in.App, "stream", in.Stream, "mediaServerID", in.MediaServerID) - - if in.App == "rtp" { - ch, err := w.gb28181Core.GetChannel(c.Request.Context(), in.Stream) - if err != nil { - w.log.WarnContext(c.Request.Context(), "webhook onStreamNoneReader", "err", err) - return onStreamNoneReaderOutput{Close: true}, nil - } - _ = w.gbs.StopPlay(&gbs.StopPlayInput{Channel: ch}) - } else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { - } // 存在录像计划时,不关闭流 return onStreamNoneReaderOutput{Close: true}, nil } @@ -179,86 +163,18 @@ func (w WebHookAPI) onRTPServerTimeout(c *gin.Context, in *onRTPServerTimeoutInp func (w WebHookAPI) onStreamNotFound(c *gin.Context, in *onStreamNotFoundInput) (DefaultOutput, error) { w.log.InfoContext(c.Request.Context(), "webhook onStreamNotFound", "app", in.App, "stream", in.Stream, "schema", in.Schema, "mediaServerID", in.MediaServerID) + // 防止重复触发 + if in.Schema != "rtmp" { + return newDefaultOutputOK(), nil + } - // 国标流处理 - if in.App == "rtp" { - // 防止重复触发 - if in.Schema != "rtmp" { - return newDefaultOutputOK(), nil - } - v := RTPStream{uc: w.uc} - if err := v.onStreamNotFound(c.Request.Context(), in); err != nil { - slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err) - } - } else if strings.HasPrefix(in.Stream, bz.IDPrefixRTSP) { - v := RTSPStream{uc: w.uc} - if err := v.onStreamNotFound(c.Request.Context(), in); err != nil { + r := ipc.GetType(in.Stream) + protocol, ok := w.protocols[r] + if ok { + if err := protocol.OnStreamNotFound(c.Request.Context(), in.App, in.Stream); err != nil { slog.ErrorContext(c.Request.Context(), "webhook onStreamNotFound", "err", err) } } + return newDefaultOutputOK(), nil } - -type RTPStream struct { - uc *Usecase -} - -func (r RTPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error { - ch, err := r.uc.GB28181API.ipc.GetChannel(ctx, in.Stream) - if err != nil { - return err - } - - dev, err := r.uc.GB28181API.ipc.GetDevice(ctx, ch.DID) - if err != nil { - return err - } - - svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) - if err != nil { - return err - } - - return r.uc.WebHookAPI.gbs.Play(&gbs.PlayInput{ - Channel: ch, - StreamMode: dev.StreamMode, - SMS: svr, - }) -} - -type RTSPStream struct { - uc *Usecase -} - -func (r RTSPStream) onStreamNotFound(ctx context.Context, in *onStreamNotFoundInput) error { - proxy, err := r.uc.ProxyAPI.proxyCore.GetStreamProxy(ctx, in.Stream) - if err != nil { - return err - } - - svr, err := r.uc.SMSAPI.smsCore.GetMediaServer(ctx, sms.DefaultMediaServerID) - if err != nil { - return err - } - resp, err := r.uc.SMSAPI.smsCore.AddStreamProxy(svr, zlm.AddStreamProxyRequest{ - Vhost: "__defaultVhost__", - App: proxy.App, - Stream: proxy.Stream, - URL: proxy.SourceURL, - RetryCount: 3, - RTPType: proxy.Transport, - TimeoutSec: 10, - EnableHLSFMP4: zlm.NewBool(true), - EnableAudio: zlm.NewBool(true), - EnableRTSP: zlm.NewBool(true), - EnableRTMP: zlm.NewBool(true), - AddMuteAudio: zlm.NewBool(true), - AutoClose: zlm.NewBool(true), - }) - if err != nil { - return err - } - // 用于关闭 - r.uc.ProxyAPI.proxyCore.EditStreamProxyKey(ctx, resp.Data.Key, proxy.ID) - return nil -} diff --git a/pkg/gbs/devices.go b/pkg/gbs/devices.go index 47f03be..87799f9 100644 --- a/pkg/gbs/devices.go +++ b/pkg/gbs/devices.go @@ -44,7 +44,7 @@ type Device struct { } func NewDevice(conn sip.Connection, d *ipc.Device) *Device { - uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.DeviceID, d.Address)) + uri, err := sip.ParseURI(fmt.Sprintf("sip:%s@%s", d.GetGB28181DeviceID(), d.Address)) if err != nil { slog.Error("parse uri", "err", err, "did", d.ID) return nil diff --git a/pkg/gbs/play.go b/pkg/gbs/play.go index 936691c..8f70cb9 100644 --- a/pkg/gbs/play.go +++ b/pkg/gbs/play.go @@ -1,6 +1,7 @@ package gbs import ( + "context" "fmt" "log/slog" "net" @@ -47,7 +48,7 @@ func (g *GB28181API) stopPlay(ch *Channel, in *StopPlayInput) error { } // StopPlay 加锁的停止播放 -func (g *GB28181API) StopPlay(in *StopPlayInput) error { +func (g *GB28181API) StopPlay(ctx context.Context, in *StopPlayInput) error { ch, ok := g.svr.memoryStorer.GetChannel(in.Channel.DeviceID, in.Channel.ChannelID) if !ok { return ErrDeviceNotExist @@ -57,7 +58,7 @@ func (g *GB28181API) StopPlay(in *StopPlayInput) error { defer ch.device.playMutex.Unlock() defer func() { - g.svr.gb.core.EditPlaying(in.Channel.DeviceID, in.Channel.ChannelID, false) + g.svr.gb.core.EditPlaying(ctx, in.Channel.DeviceID, in.Channel.ChannelID, false) }() return g.stopPlay(ch, in) } @@ -109,7 +110,7 @@ func (g *GB28181API) Play(in *PlayInput) error { return err } - g.svr.gb.core.EditPlaying(in.Channel.DeviceID, in.Channel.ChannelID, true) + g.svr.gb.core.EditPlaying(context.TODO(), in.Channel.DeviceID, in.Channel.ChannelID, true) return nil } diff --git a/pkg/gbs/register.go b/pkg/gbs/register.go index 90a1f44..223c5ef 100644 --- a/pkg/gbs/register.go +++ b/pkg/gbs/register.go @@ -9,7 +9,7 @@ import ( "unicode" "github.com/gowvp/gb28181/internal/conf" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs/sip" "github.com/ixugo/goddd/pkg/conc" @@ -20,7 +20,7 @@ const ignorePassword = "#" type GB28181API struct { cfg *conf.SIP - core gb28181.GBDAdapter + core ipc.Adapter catalog *sip.Collector[Channels] @@ -32,7 +32,7 @@ type GB28181API struct { sms *sms.NodeManager } -func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeManager) *GB28181API { +func NewGB28181API(cfg *conf.Bootstrap, store ipc.Adapter, sms *sms.NodeManager) *GB28181API { g := GB28181API{ cfg: &cfg.Sip, core: store, @@ -56,32 +56,35 @@ func NewGB28181API(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sms *sms.NodeM // } // } - ipc, ok := g.svr.memoryStorer.Load(s) + d, ok := g.svr.memoryStorer.Load(s) if ok { for _, ch := range channel { ch := Channel{ ChannelID: ch.ChannelID, - device: ipc, + device: d, } ch.init(g.cfg.Domain) - ipc.Channels.Store(ch.ChannelID, &ch) + d.Channels.Store(ch.ChannelID, &ch) } } - out := make([]*gb28181.Channel, len(channel)) + out := make([]*ipc.Channel, len(channel)) for i, ch := range channel { - out[i] = &gb28181.Channel{ + out[i] = &ipc.Channel{ DeviceID: s, ChannelID: ch.ChannelID, Name: ch.Name, IsOnline: ch.Status == "OK" || ch.Status == "ON", - Ext: gb28181.DeviceExt{ + Ext: ipc.DeviceExt{ Manufacturer: ch.Manufacturer, Model: ch.Model, }, + Type: ipc.TypeGB28181, } } - g.core.SaveChannels(out) + if err := g.core.SaveChannels(out); err != nil { + slog.Error("SaveChannels", "err", err) + } }) return &g } @@ -131,19 +134,18 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { password = "" } - hdrs := ctx.Request.GetHeaders("Authorization") - if len(hdrs) == 0 { - resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) - resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))}) - _ = ctx.Tx.Respond(resp) - return - } - if password != "" { + hdrs := ctx.Request.GetHeaders("Authorization") + if len(hdrs) == 0 { + resp := sip.NewResponseFromRequest("", ctx.Request, http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized), nil) + resp.AppendHeader(&sip.GenericHeader{HeaderName: "WWW-Authenticate", Contents: fmt.Sprintf(`Digest realm="%s",qop="auth",nonce="%s"`, g.cfg.Domain, sip.RandString(32))}) + _ = ctx.Tx.Respond(resp) + return + } authenticateHeader := hdrs[0].(*sip.GenericHeader) auth := sip.AuthFromValue(authenticateHeader.Contents) auth.SetPassword(password) - auth.SetUsername(dev.DeviceID) + auth.SetUsername(dev.GetGB28181DeviceID()) auth.SetMethod(ctx.Request.Method()) auth.SetURI(auth.Get("uri")) if auth.CalcResponse() != auth.Get("response") { @@ -165,7 +167,7 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { expire := ctx.GetHeader("Expires") if expire == "0" { ctx.Log.Info("设备注销") - g.logout(ctx.DeviceID, func(b *gb28181.Device) { + g.logout(ctx.DeviceID, func(b *ipc.Device) { b.IsOnline = false b.Address = ctx.Source.String() }) @@ -173,7 +175,7 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { return } - g.login(ctx, func(b *gb28181.Device) { + g.login(ctx, func(b *ipc.Device) { b.IsOnline = true b.RegisteredAt = orm.Now() b.KeepaliveAt = orm.Now() @@ -192,11 +194,11 @@ func (g *GB28181API) handlerRegister(ctx *sip.Context) { respFn() g.QueryDeviceInfo(ctx) - _ = g.QueryCatalog(dev.DeviceID) - _ = g.QueryConfigDownloadBasic(dev.DeviceID) + _ = g.QueryCatalog(dev.GetGB28181DeviceID()) + _ = g.QueryConfigDownloadBasic(dev.GetGB28181DeviceID()) } -func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) { +func (g GB28181API) login(ctx *sip.Context, fn func(d *ipc.Device)) { slog.Info("status change 设备上线", "device_id", ctx.DeviceID) g.svr.memoryStorer.Change(ctx.DeviceID, fn, func(d *Device) { d.conn = ctx.Request.GetConnection() @@ -205,7 +207,7 @@ func (g GB28181API) login(ctx *sip.Context, fn func(d *gb28181.Device)) { }) } -func (g GB28181API) logout(deviceID string, changeFn func(*gb28181.Device)) error { +func (g GB28181API) logout(deviceID string, changeFn func(*ipc.Device)) error { slog.Info("status change 设备离线", "device_id", deviceID) return g.svr.memoryStorer.Change(deviceID, changeFn, func(d *Device) { d.Expires = 0 diff --git a/pkg/gbs/server.go b/pkg/gbs/server.go index 0833c6b..64fb02d 100644 --- a/pkg/gbs/server.go +++ b/pkg/gbs/server.go @@ -11,7 +11,8 @@ import ( "time" "github.com/gowvp/gb28181/internal/conf" - gb28181 "github.com/gowvp/gb28181/internal/core/ipc" + "github.com/gowvp/gb28181/internal/core/bz" + "github.com/gowvp/gb28181/internal/core/ipc" "github.com/gowvp/gb28181/internal/core/sms" "github.com/gowvp/gb28181/pkg/gbs/m" "github.com/gowvp/gb28181/pkg/gbs/sip" @@ -24,13 +25,13 @@ type MemoryStorer interface { LoadDeviceToMemory(conn sip.Connection) // 加载设备到内存 RangeDevices(fn func(key string, value *Device) bool) // 遍历设备 - Change(deviceID string, changeFn func(*gb28181.Device), changeFn2 func(*Device)) error // 登出设备 + Change(deviceID string, changeFn func(*ipc.Device), changeFn2 func(*Device)) error // 登出设备 Load(deviceID string) (*Device, bool) Store(deviceID string, value *Device) GetChannel(deviceID, channelID string) (*Channel, bool) - // Change(deviceID string, changeFn func(*gb28181.Device)) // 修改设备 + // Change(deviceID string, changeFn func(*ipc.Device)) // 修改设备 } type Server struct { @@ -42,7 +43,7 @@ type Server struct { memoryStorer MemoryStorer } -func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Server, func()) { +func NewServer(cfg *conf.Bootstrap, store ipc.Adapter, sc sms.Core) (*Server, func()) { api := NewGB28181API(cfg, store, sc.NodeManager) iip := ip.InternalIP() @@ -90,18 +91,20 @@ func NewServer(cfg *conf.Bootstrap, store gb28181.GBDAdapter, sc sms.Core) (*Ser func (s *Server) startTickerCheck() { conc.Timer(context.Background(), 60*time.Second, time.Second, func() { now := time.Now() - s.memoryStorer.RangeDevices(func(key string, ipc *Device) bool { - if !ipc.IsOnline { + s.memoryStorer.RangeDevices(func(key string, dev *Device) bool { + if !dev.IsOnline { return true } - - timeout := time.Duration(ipc.keepaliveTimeout) * time.Duration(ipc.keepaliveInterval) * time.Second + if !bz.IsGB28181(key) { + return true + } + timeout := time.Duration(dev.keepaliveTimeout) * time.Duration(dev.keepaliveInterval) * time.Second if timeout <= 0 { timeout = 3 * 60 * time.Second } - if sub := now.Sub(ipc.LastKeepaliveAt); sub >= timeout || ipc.conn == nil { - s.gb.logout(key, func(d *gb28181.Device) { + if sub := now.Sub(dev.LastKeepaliveAt); sub >= timeout || dev.conn == nil { + s.gb.logout(key, func(d *ipc.Device) { d.IsOnline = false }) } @@ -212,8 +215,8 @@ func (s *Server) Play(in *PlayInput) error { return s.gb.Play(in) } -func (s *Server) StopPlay(in *StopPlayInput) error { - return s.gb.StopPlay(in) +func (s *Server) StopPlay(ctx context.Context, in *StopPlayInput) error { + return s.gb.StopPlay(ctx, in) } // QuerySnapshot 厂商实现抓图的少,sip 层已实现,先搁置