feat: add sei plugin

This commit is contained in:
langhuihui
2024-10-03 15:19:50 +08:00
parent 0309298ab0
commit fd51d6df39
32 changed files with 885 additions and 162 deletions
+5 -5
View File
@@ -20,13 +20,13 @@ func main() {
}
```
## build tags
## with sqlite
| Build Tag | Description |
|-----------|-------------|
| disable_rm | Disables the memory pool |
| sqlite | Enables the sqlite DB |
```shell
go build -tags sqlite -o monibuca_sqlite
./monibuca_sqlite -c config.yaml
```
## More Example
+6
View File
@@ -18,6 +18,12 @@ func main() {
}
```
## 构建标签
| 标签 | 描述 |
|-----------|-------------|
| disable_rm | 禁用内存池 |
| sqlite | 启用 sqlite |
## 更多示例
-4
View File
@@ -8,7 +8,3 @@ gb28181:
onsub:
pull:
.* : $0
#rtsp:
# tcp:
# listenaddr: :10554
Binary file not shown.
+7
View File
@@ -0,0 +1,7 @@
global:
loglevel: debug
rtsp:
pull:
live/test: rtsp://154.12.27.148:30002/main
+1 -1
View File
@@ -2,4 +2,4 @@ global:
loglevel: trace
flv:
pull:
live/test: dump.flv
live/test: record/live/test/sei.flv
-12
View File
@@ -1,12 +0,0 @@
global:
loglevel: trace
record:
# db:
# dbtype: duckdb
onsub:
pull:
^vod/(.+)$: record/$1
onpub:
record:
^live/.+$:
filepath: record/$0
+13
View File
@@ -0,0 +1,13 @@
global:
loglevel: debug
sei:
# loglevel: trace
onpub:
transform:
.+:
output:
- target: $0/sei
flv:
pull:
live/test: /Users/dexter/Movies/jb-demo.flv
+6 -4
View File
@@ -94,13 +94,15 @@ func (frame *AVFrame) Demux(codecCtx codec.ICodecCtx) (err error) {
return
}
func (df *DataFrame) StartWrite() bool {
func (df *DataFrame) StartWrite() (success bool) {
if df.discard {
return
}
if df.TryLock() {
return true
} else {
df.discard = true
return false
}
df.discard = true
return
}
func (df *DataFrame) Ready() {
+35 -5
View File
@@ -48,7 +48,17 @@ func (r *RawAudio) Parse(track *AVTrack) (err error) {
}
func (r *RawAudio) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
return ctx.GetBase(), nil, nil
c := ctx.GetBase()
if c.FourCC().Is(codec.FourCC_MP4A) {
seq := &RawAudio{
FourCC: codec.FourCC_MP4A,
Timestamp: r.Timestamp,
}
seq.SetAllocator(r.GetAllocator())
seq.Memory.Append(c.GetRecord())
return c, seq, nil
}
return c, nil, nil
}
func (r *RawAudio) Demux(ctx codec.ICodecCtx) (any, error) {
@@ -111,8 +121,10 @@ func (h *H26xFrame) Parse(track *AVTrack) (err error) {
}
case h264parser.NALU_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.Record = make([]byte, ctx.RecordInfo.Len())
ctx.RecordInfo.Marshal(ctx.Record)
ctx.CodecData, err = h264parser.NewCodecDataFromSPSAndPPS(ctx.SPS(), ctx.PPS())
if err != nil {
return
}
case codec.NALU_IDR_Picture:
track.Value.IDR = true
}
@@ -135,8 +147,7 @@ func (h *H26xFrame) Parse(track *AVTrack) (err error) {
}
case h265parser.NAL_UNIT_PPS:
ctx.RecordInfo.PPS = [][]byte{nalu.ToBytes()}
ctx.Record = make([]byte, ctx.RecordInfo.Len())
ctx.RecordInfo.Marshal(ctx.Record, ctx.SPSInfo)
ctx.CodecData, err = h265parser.NewCodecDataFromVPSAndSPSAndPPS(ctx.VPS(), ctx.SPS(), ctx.PPS())
case h265parser.NAL_UNIT_CODED_SLICE_BLA_W_LP,
h265parser.NAL_UNIT_CODED_SLICE_BLA_W_RADL,
h265parser.NAL_UNIT_CODED_SLICE_BLA_N_LP,
@@ -151,6 +162,25 @@ func (h *H26xFrame) Parse(track *AVTrack) (err error) {
}
func (h *H26xFrame) ConvertCtx(ctx codec.ICodecCtx) (codec.ICodecCtx, IAVFrame, error) {
switch c := ctx.GetBase().(type) {
case *codec.H264Ctx:
return c, &H26xFrame{
FourCC: codec.FourCC_H264,
Nalus: []util.Memory{
util.NewMemory(c.SPS()),
util.NewMemory(c.PPS()),
},
}, nil
case *codec.H265Ctx:
return c, &H26xFrame{
FourCC: codec.FourCC_H265,
Nalus: []util.Memory{
util.NewMemory(c.VPS()),
util.NewMemory(c.SPS()),
util.NewMemory(c.PPS()),
},
}, nil
}
return ctx.GetBase(), nil, nil
}
+4 -3
View File
@@ -3,10 +3,11 @@ package pkg
import (
"fmt"
"log/slog"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
"sync"
"time"
"m7s.live/m7s/v5/pkg/task"
"m7s.live/m7s/v5/pkg/util"
)
type RingWriter struct {
@@ -66,7 +67,7 @@ func (rb *RingWriter) reduce(size int) {
r := rb.Unlink(size)
rb.Size -= size
for range size {
if r.Value.TryLock() {
if r.Value.StartWrite() {
rb.poolSize++
r.Value.Reset()
r.Value.Unlock()
+11 -10
View File
@@ -98,7 +98,7 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
switch t.(type) {
case TaskStarter, TaskBlock, TaskGo:
// need start now
default:
case IJob:
// lazy start
return
}
@@ -137,18 +137,19 @@ func (mt *Job) AddTask(t ITask, opt ...any) (task *Task) {
return
}
func (mt *Job) Call(callback func() error) {
mt.Post(callback).WaitStarted()
func (mt *Job) Call(callback func() error, args ...any) {
mt.Post(callback, args...).WaitStarted()
}
func (mt *Job) Post(callback func() error) *Task {
func (mt *Job) Post(callback func() error, args ...any) *Task {
task := CreateTaskByCallBack(callback, nil)
return mt.AddTask(task)
}
func (mt *Job) addChild(task ITask) int {
mt.children = append(mt.children, task)
return len(mt.children) - 1
description := make(Description)
if len(args) > 0 {
description["ownerType"] = args[0]
} else {
description = nil
}
return mt.AddTask(task, description)
}
func (mt *Job) run() {
+5 -4
View File
@@ -75,8 +75,8 @@ type (
RangeSubTask(func(yield ITask) bool)
OnChildDispose(func(ITask))
Blocked() bool
Call(func() error)
Post(func() error) *Task
Call(func() error, ...any)
Post(func() error, ...any) *Task
}
IChannelTask interface {
ITask
@@ -99,8 +99,9 @@ type (
RetryCount int
RetryInterval time.Duration
}
Description = map[string]any
Task struct {
Description = map[string]any
TaskContextKey string
Task struct {
ID uint32
StartTime time.Time
*slog.Logger
+7 -2
View File
@@ -84,7 +84,7 @@ type (
}
IDevicePlugin interface {
OnDeviceAdd(device *Device) task.ITask
OnDeviceAdd(device *Device) task.ITask
}
)
@@ -383,7 +383,12 @@ func (p *Plugin) OnPublish(pub *Publisher) {
}
}
}
if p.Meta.Transformer != nil {
var owner = pub.Value(Owner)
var isTransformer bool
if owner != nil {
_, isTransformer = owner.(ITransformer)
}
if p.Meta.Transformer != nil && !isTransformer {
for r, tranConf := range onPublish.Transform {
if group := r.FindStringSubmatch(pub.StreamPath); group != nil {
for j, to := range tranConf.Output {
+15 -2
View File
@@ -197,7 +197,7 @@ message MyResponse {
```shell
protoc -I. -I$ProjectFileDir$/pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative myplugin.proto
```
把其中的 $ProjectFileDir$ 替换成包含全局 pb 的目录,全局 pb 文件就在 monibuca 项目的 pb 目录下。
把其中的 `$ProjectFileDir$` 替换成包含全局 pb 的目录,全局 pb 文件就在 monibuca 项目的 pb 目录下。
### 实现gRPC服务
创建 api.go 文件
@@ -290,4 +290,17 @@ suber, err = p.Subscribe(ctx,streamPath)
go m7s.PlayBlock(suber, handleAudio, handleVideo)
```
这里需要注意的是 handleAudio, handleVideo 是处理音视频数据的回调函数,需要自己实现。
handleAudio/Video 的入参是一个你需要接受到的音视频格式类型,返回 error,如果返回的 error 不是 nil,则订阅中止。
handleAudio/Video 的入参是一个你需要接受到的音视频格式类型,返回 error,如果返回的 error 不是 nil,则订阅中止。
## 7. 接入 Prometheus
只需要实现 Collector 接口,系统会自动收集所有插件的指标信息。
```go
func (p *MyPlugin) Describe(ch chan<- *prometheus.Desc) {
}
func (p *MyPlugin) Collect(ch chan<- prometheus.Metric) {
}
```
+3 -7
View File
@@ -119,16 +119,12 @@ func (avcc *RTMPVideo) Parse(t *AVTrack) (err error) {
}
func (avcc *RTMPVideo) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq IAVFrame, err error) {
var b util.Buffer
var enhanced = true //TODO
switch fourCC := from.FourCC(); fourCC {
case codec.FourCC_H264:
h264ctx := from.GetBase().(*codec.H264Ctx)
b = make(util.Buffer, h264ctx.RecordInfo.Len()+5)
b[0] = 0x17
h264ctx.RecordInfo.Marshal(b[5:])
var seqFrame RTMPData
seqFrame.AppendOne(b)
seqFrame.AppendOne(append([]byte{0x17, 0, 0, 0, 0}, h264ctx.Record...))
//if t.Enabled(context.TODO(), TraceLevel) {
// c := t.FourCC().String()
// size := seqFrame.GetSize()
@@ -138,14 +134,14 @@ func (avcc *RTMPVideo) ConvertCtx(from codec.ICodecCtx) (to codec.ICodecCtx, seq
return h264ctx, seqFrame.WrapVideo(), err
case codec.FourCC_H265:
h265ctx := from.GetBase().(*codec.H265Ctx)
b = make(util.Buffer, h265ctx.RecordInfo.Len()+5)
b := make(util.Buffer, len(h265ctx.Record)+5)
if enhanced {
b[0] = 0b1001_0000 | byte(PacketTypeSequenceStart)
copy(b[1:], codec.FourCC_H265[:])
} else {
b[0], b[1], b[2], b[3], b[4] = 0x1C, 0, 0, 0, 0
}
h265ctx.RecordInfo.Marshal(b[5:], h265parser.SPSInfo{})
copy(b[5:], h265ctx.Record)
var ctx H265Ctx
ctx.Enhanced = enhanced
ctx.H265Ctx = *h265ctx
+92 -5
View File
@@ -5,13 +5,14 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/bluenviron/mediacommon/pkg/bits"
"github.com/deepch/vdk/codec/aacparser"
"io"
"strings"
"time"
"unsafe"
"github.com/bluenviron/mediacommon/pkg/bits"
"github.com/deepch/vdk/codec/aacparser"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
. "m7s.live/m7s/v5/pkg"
@@ -216,10 +217,26 @@ func (r *RTPAudio) Parse(t *AVTrack) (err error) {
var ctx PCMUCtx
ctx.parseFmtpLine(r.RTPCodecParameters)
t.ICodecCtx = &ctx
case "audio/MP4A-LATM":
var ctx *AACCtx
if t.ICodecCtx != nil {
// ctx = t.ICodecCtx.(*AACCtx)
} else {
ctx = &AACCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
if conf, ok := ctx.Fmtp["config"]; ok {
if ctx.AACCtx.ConfigBytes, err = hex.DecodeString(conf); err == nil {
if ctx.CodecData, err = aacparser.NewCodecDataFromMPEG4AudioConfigBytes(ctx.AACCtx.ConfigBytes); err != nil {
return
}
}
}
t.ICodecCtx = ctx
}
case "audio/MPEG4-GENERIC":
var ctx *AACCtx
if t.ICodecCtx != nil {
ctx = t.ICodecCtx.(*AACCtx)
// ctx = t.ICodecCtx.(*AACCtx)
} else {
ctx = &AACCtx{}
ctx.parseFmtpLine(r.RTPCodecParameters)
@@ -239,10 +256,80 @@ func (r *RTPAudio) Parse(t *AVTrack) (err error) {
return
}
func payloadLengthInfoDecode(buf []byte) (int, int, error) {
lb := len(buf)
l := 0
n := 0
for {
if (lb - n) == 0 {
return 0, 0, fmt.Errorf("not enough bytes")
}
b := buf[n]
n++
l += int(b)
if b != 255 {
break
}
}
return l, n, nil
}
func (r *RTPAudio) Demux(codexCtx codec.ICodecCtx) (any, error) {
var data AudioData
switch codexCtx.(type) {
case *AACCtx:
switch r.MimeType {
case "audio/MP4A-LATM":
var fragments util.Memory
var fragmentsExpected int
var fragmentsSize int
for _, packet := range r.Packets {
buf := packet.Payload
if fragments.Size == 0 {
pl, n, err := payloadLengthInfoDecode(buf)
if err != nil {
return nil, err
}
buf = buf[n:]
bl := len(buf)
if pl <= bl {
data.AppendOne(buf[:pl])
// there could be other data, due to otherDataPresent. Ignore it.
} else {
if pl > 5*1024 {
fragments = util.Memory{} // discard pending fragments
return nil, fmt.Errorf("access unit size (%d) is too big, maximum is %d",
pl, 5*1024)
}
fragments.AppendOne(buf)
fragmentsSize = pl
fragmentsExpected = pl - bl
continue
}
} else {
bl := len(buf)
if fragmentsExpected > bl {
fragments.AppendOne(buf)
fragmentsExpected -= bl
continue
}
fragments.AppendOne(buf[:fragmentsExpected])
// there could be other data, due to otherDataPresent. Ignore it.
data.Append(fragments.Buffers...)
if fragments.Size != fragmentsSize {
return nil, fmt.Errorf("fragmented AU size is not correct %d != %d", data.Size, fragmentsSize)
}
fragments = util.Memory{}
}
}
case "audio/MPEG4-GENERIC":
var fragments util.Memory
for _, packet := range r.Packets {
if len(packet.Payload) < 2 {
+4 -2
View File
@@ -4,10 +4,11 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/pion/sdp/v3"
"strconv"
"strings"
"unicode"
"github.com/pion/sdp/v3"
)
const (
@@ -29,6 +30,7 @@ const (
CodecPCMU = "PCMU" // payloadType: 0
CodecPCMA = "PCMA" // payloadType: 8
CodecAAC = "MPEG4-GENERIC"
CodecLATM = "MP4A-LATM"
CodecOpus = "OPUS" // payloadType: 111
CodecG722 = "G722"
CodecMP3 = "MPA" // payload: 14, aka MPEG-1 Layer III
@@ -115,7 +117,7 @@ func FFmpegCodecName(name string) string {
return "pcm_s16be"
case CodecPCML:
return "pcm_s16le"
case CodecAAC:
case CodecAAC, CodecLATM:
return "aac"
case CodecOpus:
return "opus"
+1 -1
View File
@@ -138,7 +138,7 @@ func (c *NetConnection) Connect(remoteURL string) (err error) {
c.URL = rtspURL
c.UserAgent = "monibuca" + m7s.Version
c.auth = util.NewAuth(c.URL.User)
c.Backchannel = true
// c.Backchannel = true
return
}
+3 -2
View File
@@ -3,8 +3,9 @@ package rtsp
import (
"encoding/json"
"fmt"
"github.com/pion/sdp/v3"
"strings"
"github.com/pion/sdp/v3"
)
// Media take best from:
@@ -93,7 +94,7 @@ func GetKind(name string) string {
switch name {
case CodecH264, CodecH265, CodecVP8, CodecVP9, CodecAV1, CodecJPEG, CodecRAW:
return KindVideo
case CodecPCMU, CodecPCMA, CodecAAC, CodecOpus, CodecG722, CodecMP3, CodecPCM, CodecPCML, CodecELD, CodecFLAC:
case CodecPCMU, CodecPCMA, CodecAAC, CodecLATM, CodecOpus, CodecG722, CodecMP3, CodecPCM, CodecPCML, CodecELD, CodecFLAC:
return KindAudio
}
return ""
+52
View File
@@ -0,0 +1,52 @@
package plugin_sei
import (
"context"
"errors"
globalPB "m7s.live/m7s/v5/pb"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
pb "m7s.live/m7s/v5/plugin/sei/pb"
sei "m7s.live/m7s/v5/plugin/sei/pkg"
)
func (conf *SEIPlugin) Insert(ctx context.Context, req *pb.InsertRequest) (*globalPB.SuccessResponse, error) {
streamPath := req.StreamPath
targetStreamPath := req.TargetStreamPath
if targetStreamPath == "" {
targetStreamPath = streamPath + "/sei"
}
ok := conf.Server.Streams.Has(streamPath)
if !ok {
return nil, pkg.ErrNotFound
}
var transformer *sei.Transformer
if tm, ok := conf.Server.Transforms.Transformed.Get(targetStreamPath); ok {
transformer, ok = tm.TransformJob.Transformer.(*sei.Transformer)
if !ok {
return nil, errors.New("targetStreamPath is not a sei transformer")
}
} else {
transformer = sei.NewTransform().(*sei.Transformer)
transformer.TransformJob.Init(transformer, &conf.Plugin, streamPath, config.Transform{
Output: []config.TransfromOutput{
{
Target: targetStreamPath,
StreamPath: targetStreamPath,
},
},
}).WaitStarted()
}
t := req.Type
transformer.AddSEI(byte(t), req.Data)
err := transformer.WaitStarted()
if err != nil {
return nil, err
}
return &globalPB.SuccessResponse{
Code: 0,
Message: "success",
}, nil
}
+3 -62
View File
@@ -1,73 +1,14 @@
package plugin_sei
import (
"io"
"net/http"
"strconv"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/util"
"m7s.live/m7s/v5/plugin/sei/pb"
sei "m7s.live/m7s/v5/plugin/sei/pkg"
)
var _ = m7s.InstallPlugin[SEIPlugin](sei.NewTransform)
var _ = m7s.InstallPlugin[SEIPlugin](sei.NewTransform, pb.RegisterApiServer, &pb.Api_ServiceDesc)
type SEIPlugin struct {
pb.UnimplementedApiServer
m7s.Plugin
}
func (conf *SEIPlugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/api/insert/{streamPath...}": conf.api_insert,
}
}
func (conf *SEIPlugin) api_insert(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
streamPath := r.PathValue("streamPath")
targetStreamPath := q.Get("targetStreamPath")
if targetStreamPath == "" {
targetStreamPath = streamPath + "/sei"
}
ok := conf.Server.Streams.Has(streamPath)
if !ok {
util.ReturnError(util.APIErrorNoStream, streamPath+" not found", w, r)
return
}
var transformer *sei.Transformer
if tm, ok := conf.Server.Transforms.Transformed.Get(targetStreamPath); ok {
transformer, ok = tm.TransformJob.Transformer.(*sei.Transformer)
if !ok {
util.ReturnError(util.APIErrorPublish, "targetStreamPath is not a sei transformer", w, r)
return
}
} else {
transformer = sei.NewTransform().(*sei.Transformer)
conf.Transform(streamPath, config.Transform{
Output: []config.TransfromOutput{
{
Target: targetStreamPath,
StreamPath: targetStreamPath,
},
},
})
}
t := q.Get("type")
tb, err := strconv.ParseInt(t, 10, 8)
if err != nil {
if t == "" {
tb = 5
} else {
util.ReturnError(util.APIErrorQueryParse, "type must a number", w, r)
return
}
}
sei, err := io.ReadAll(r.Body)
if err != nil {
util.ReturnError(util.APIErrorNoBody, err.Error(), w, r)
return
}
transformer.AddSEI(byte(tb), sei)
util.ReturnOK(w, r)
}
+187
View File
@@ -0,0 +1,187 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// source: sei.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
pb "m7s.live/m7s/v5/pb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type InsertRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Type uint32 `protobuf:"varint,3,opt,name=type,proto3" json:"type,omitempty"`
TargetStreamPath string `protobuf:"bytes,4,opt,name=targetStreamPath,proto3" json:"targetStreamPath,omitempty"`
}
func (x *InsertRequest) Reset() {
*x = InsertRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_sei_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InsertRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InsertRequest) ProtoMessage() {}
func (x *InsertRequest) ProtoReflect() protoreflect.Message {
mi := &file_sei_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InsertRequest.ProtoReflect.Descriptor instead.
func (*InsertRequest) Descriptor() ([]byte, []int) {
return file_sei_proto_rawDescGZIP(), []int{0}
}
func (x *InsertRequest) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
func (x *InsertRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
func (x *InsertRequest) GetType() uint32 {
if x != nil {
return x.Type
}
return 0
}
func (x *InsertRequest) GetTargetStreamPath() string {
if x != nil {
return x.TargetStreamPath
}
return ""
}
var File_sei_proto protoreflect.FileDescriptor
var file_sei_proto_rawDesc = []byte{
0x0a, 0x09, 0x73, 0x65, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x73, 0x65, 0x69,
0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e,
0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0c,
0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x01, 0x0a,
0x0d, 0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e,
0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x12,
0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61,
0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d,
0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2a, 0x0a, 0x10, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x10, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61,
0x74, 0x68, 0x32, 0x6b, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x64, 0x0a, 0x06, 0x69, 0x6e, 0x73,
0x65, 0x72, 0x74, 0x12, 0x12, 0x2e, 0x73, 0x65, 0x69, 0x2e, 0x49, 0x6e, 0x73, 0x65, 0x72, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c,
0x2e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x22, 0x2d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x27, 0x22, 0x1f, 0x2f, 0x73, 0x65, 0x69, 0x2f, 0x61,
0x70, 0x69, 0x2f, 0x69, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d, 0x3a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42,
0x1f, 0x5a, 0x1d, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73, 0x2f,
0x76, 0x35, 0x2f, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2f, 0x73, 0x65, 0x69, 0x2f, 0x70, 0x62,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_sei_proto_rawDescOnce sync.Once
file_sei_proto_rawDescData = file_sei_proto_rawDesc
)
func file_sei_proto_rawDescGZIP() []byte {
file_sei_proto_rawDescOnce.Do(func() {
file_sei_proto_rawDescData = protoimpl.X.CompressGZIP(file_sei_proto_rawDescData)
})
return file_sei_proto_rawDescData
}
var file_sei_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_sei_proto_goTypes = []interface{}{
(*InsertRequest)(nil), // 0: sei.InsertRequest
(*pb.SuccessResponse)(nil), // 1: global.SuccessResponse
}
var file_sei_proto_depIdxs = []int32{
0, // 0: sei.api.insert:input_type -> sei.InsertRequest
1, // 1: sei.api.insert:output_type -> global.SuccessResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_sei_proto_init() }
func file_sei_proto_init() {
if File_sei_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_sei_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InsertRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_sei_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_sei_proto_goTypes,
DependencyIndexes: file_sei_proto_depIdxs,
MessageInfos: file_sei_proto_msgTypes,
}.Build()
File_sei_proto = out.File
file_sei_proto_rawDesc = nil
file_sei_proto_goTypes = nil
file_sei_proto_depIdxs = nil
}
+215
View File
@@ -0,0 +1,215 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: sei.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
var (
filter_Api_Insert_0 = &utilities.DoubleArray{Encoding: map[string]int{"data": 0, "streamPath": 1}, Base: []int{1, 1, 2, 0, 0}, Check: []int{0, 1, 1, 2, 3}}
)
func request_Api_Insert_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq InsertRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.Data); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_Insert_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.Insert(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_Insert_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq InsertRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq.Data); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_Insert_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.Insert(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterApiHandlerFromEndpoint instead.
func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ApiServer) error {
mux.Handle("POST", pattern_Api_Insert_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/sei.Api/Insert", runtime.WithHTTPPathPattern("/sei/api/insert/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_Insert_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Insert_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterApiHandlerFromEndpoint is same as RegisterApiHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterApiHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterApiHandler(ctx, mux, conn)
}
// RegisterApiHandler registers the http handlers for service Api to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterApiHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterApiHandlerClient(ctx, mux, NewApiClient(conn))
}
// RegisterApiHandlerClient registers the http handlers for service Api
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ApiClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ApiClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "ApiClient" to call the correct interceptors.
func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ApiClient) error {
mux.Handle("POST", pattern_Api_Insert_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/sei.Api/Insert", runtime.WithHTTPPathPattern("/sei/api/insert/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_Insert_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Insert_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Api_Insert_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"sei", "api", "insert", "streamPath"}, ""))
)
var (
forward_Api_Insert_0 = runtime.ForwardResponseMessage
)
+22
View File
@@ -0,0 +1,22 @@
syntax = "proto3";
import "google/api/annotations.proto";
//import "google/protobuf/empty.proto";
import "global.proto";
package sei;
option go_package="m7s.live/m7s/v5/plugin/sei/pb";
service api {
rpc insert (InsertRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
post: "/sei/api/insert/{streamPath=**}"
body: "data"
};
}
}
message InsertRequest {
string streamPath = 1;
bytes data = 2;
uint32 type = 3;
string targetStreamPath = 4;
}
+106
View File
@@ -0,0 +1,106 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// source: sei.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
pb "m7s.live/m7s/v5/pb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// ApiClient is the client API for Api service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ApiClient interface {
Insert(ctx context.Context, in *InsertRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
}
type apiClient struct {
cc grpc.ClientConnInterface
}
func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
return &apiClient{cc}
}
func (c *apiClient) Insert(ctx context.Context, in *InsertRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, "/sei.api/insert", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility
type ApiServer interface {
Insert(context.Context, *InsertRequest) (*pb.SuccessResponse, error)
mustEmbedUnimplementedApiServer()
}
// UnimplementedApiServer must be embedded to have forward compatible implementations.
type UnimplementedApiServer struct {
}
func (UnimplementedApiServer) Insert(context.Context, *InsertRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Insert not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
// result in compilation errors.
type UnsafeApiServer interface {
mustEmbedUnimplementedApiServer()
}
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
s.RegisterService(&Api_ServiceDesc, srv)
}
func _Api_Insert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InsertRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).Insert(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/sei.api/insert",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).Insert(ctx, req.(*InsertRequest))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Api_ServiceDesc = grpc.ServiceDesc{
ServiceName: "sei.api",
HandlerType: (*ApiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "insert",
Handler: _Api_Insert_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "sei.proto",
}
+36 -19
View File
@@ -1,6 +1,7 @@
package sei
import (
"github.com/deepch/vdk/codec/h265parser"
"m7s.live/m7s/v5"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
@@ -9,8 +10,7 @@ import (
type Transformer struct {
m7s.DefaultTransformer
data chan util.Buffer
allocator *util.ScalableMemoryAllocator
data chan util.Buffer
}
func (t *Transformer) AddSEI(tp byte, data []byte) {
@@ -32,8 +32,7 @@ func (t *Transformer) AddSEI(tp byte, data []byte) {
func NewTransform() m7s.ITransformer {
ret := &Transformer{
data: make(chan util.Buffer, 10),
allocator: util.NewScalableMemoryAllocator(1 << util.MinPowerOf2),
data: make(chan util.Buffer, 10),
}
return ret
}
@@ -47,12 +46,11 @@ func (t *Transformer) Run() (err error) {
if err != nil {
return
}
m7s.PlayBlock(t.TransformJob.Subscriber, func(audio *pkg.RawAudio) (err error) {
return m7s.PlayBlock(t.TransformJob.Subscriber, func(audio *pkg.RawAudio) (err error) {
copyAudio := &pkg.RawAudio{
FourCC: audio.FourCC,
Timestamp: audio.Timestamp,
}
copyAudio.SetAllocator(t.allocator)
audio.Memory.Range(func(b []byte) {
copy(copyAudio.NextN(len(b)), b)
})
@@ -63,28 +61,47 @@ func (t *Transformer) Run() (err error) {
CTS: video.CTS,
Timestamp: video.Timestamp,
}
copyVideo.SetAllocator(t.allocator)
if len(t.data) > 0 {
for seiFrame := range t.data {
var seis [][]byte
continueLoop := true
for continueLoop {
select {
case seiFrame := <-t.data:
seis = append(seis, seiFrame)
default:
continueLoop = false
}
}
seiCount := len(seis)
for _, nalu := range video.Nalus {
mem := copyVideo.NextN(nalu.Size)
copy(mem, nalu.ToBytes())
if seiCount > 0 {
switch video.FourCC {
case codec.FourCC_H264:
var seiNalu util.Memory
seiNalu.Append([]byte{byte(codec.NALU_SEI)}, seiFrame)
copyVideo.Nalus = append(copyVideo.Nalus, seiNalu)
}
for _, nalu := range video.Nalus {
mem := copyVideo.NextN(nalu.Size)
copy(mem, nalu.ToBytes())
copyVideo.Nalus.Append(mem)
switch codec.ParseH264NALUType(mem[0]) {
case codec.NALU_IDR_Picture, codec.NALU_Non_IDR_Picture:
for _, sei := range seis {
copyVideo.Nalus.Append(append([]byte{byte(codec.NALU_SEI)}, sei...))
}
}
case codec.FourCC_H265:
if naluType := codec.ParseH265NALUType(mem[0]); naluType < 21 {
for _, sei := range seis {
copyVideo.Nalus.Append(append([]byte{byte(0b10000000 | byte(h265parser.NAL_UNIT_PREFIX_SEI<<1))}, sei...))
}
}
}
}
copyVideo.Nalus.Append(mem)
}
if seiCount > 0 {
t.Info("insert sei", "count", seiCount)
}
return t.TransformJob.Publisher.WriteVideo(copyVideo)
})
return
}
func (t *Transformer) Dispose() {
close(t.data)
t.allocator.Recycle()
}
+1 -6
View File
@@ -188,12 +188,7 @@ func (p *Publisher) Start() (err error) {
for plugin := range s.Plugins.Range {
plugin.OnPublish(p)
}
s.Transforms.Post(func() error {
if m, ok := s.Transforms.Transformed.Get(p.StreamPath); ok {
m.TransformJob.TransformPublished(p)
}
return nil
})
s.Transforms.PublishEvent <- p
p.AddTask(&PublishTimeout{Publisher: p})
if p.PublishTimeout > 0 {
p.AddTask(&PublishNoDataTimeout{Publisher: p})
+20 -2
View File
@@ -1,4 +1,22 @@
#!/bin/bash
protoc -I. --go_out=. --go_opt=paths=source_relative --go-grpc_out=. \
--go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative ${1}.proto
name=$(basename $(pwd))
cd pb
# Run the protoc command
protoc -I. \
-I"../../../pb" \
--go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
--grpc-gateway_out=. \
--grpc-gateway_opt=paths=source_relative \
"${name}.proto"
# Check if the command was successful
if [ $? -eq 0 ]; then
echo "Proto files for ${name} built successfully"
else
echo "Error building proto files for ${name}"
exit 1
fi
+6 -2
View File
@@ -96,7 +96,6 @@ type (
RawConfig = map[string]map[string]any
)
func (w *WaitStream) GetKey() string {
return w.StreamPath
}
@@ -114,6 +113,7 @@ func NewServer(conf any) (s *Server) {
"arch": sysruntime.GOARCH,
"cpus": int32(sysruntime.NumCPU()),
}
s.Transforms.PublishEvent = make(chan *Publisher, 1)
s.prometheusDesc.init()
return
}
@@ -277,6 +277,10 @@ func (s *Server) Start() (err error) {
s.Streams.OnStart(func() {
s.Streams.AddTask(&CheckSubWaitTimeout{s: s})
})
s.Transforms.OnStart(func() {
publishEvent := &TransformsPublishEvent{Transforms: &s.Transforms}
s.Transforms.AddTask(publishEvent)
})
s.Info("server started")
s.Post(func() error {
for plugin := range s.Plugins.Range {
@@ -296,7 +300,7 @@ func (s *Server) Start() (err error) {
}
}
return nil
})
}, "serverStart")
return
}
+1 -1
View File
@@ -22,7 +22,7 @@ import (
)
var AVFrameType = reflect.TypeOf((*AVFrame)(nil))
var Owner task.TaskContextKey = "owner"
type PubSubBase struct {
task.Job
Plugin *Plugin
+18 -1
View File
@@ -1,6 +1,8 @@
package m7s
import (
"context"
"m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/task"
@@ -33,9 +35,24 @@ type (
Transforms struct {
Transformed util.Collection[string, *TransformedMap]
task.Manager[string, *TransformJob]
PublishEvent chan *Publisher
}
TransformsPublishEvent struct {
task.ChannelTask
Transforms *Transforms
}
)
func (t *TransformsPublishEvent) GetSignal() any {
return t.Transforms.PublishEvent
}
func (t *TransformsPublishEvent) Tick(pub any) {
if m, ok := t.Transforms.Transformed.Get(pub.(*Publisher).StreamPath); ok {
m.TransformJob.TransformPublished(pub.(*Publisher))
}
}
func (t *TransformedMap) GetKey() string {
return t.StreamPath
}
@@ -54,7 +71,7 @@ func (p *TransformJob) Subscribe() (err error) {
}
func (p *TransformJob) Publish(streamPath string) (err error) {
p.Publisher, err = p.Plugin.Publish(p.Transformer, streamPath)
p.Publisher, err = p.Plugin.Publish(context.WithValue(p.Transformer, Owner, p.Transformer), streamPath)
return
}