feat: flv add start record api

This commit is contained in:
pggiroro
2025-12-26 23:03:52 +08:00
parent 1bf2e4e90b
commit e5ef892739
8 changed files with 452 additions and 161 deletions
+49
View File
@@ -4,6 +4,9 @@ import (
"context"
"encoding/binary"
"errors"
task "github.com/langhuihui/gotask"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"net"
"net/http"
"time"
@@ -149,3 +152,49 @@ func (plugin *FLVPlugin) jessica(rw http.ResponseWriter, r *http.Request) {
return write(2, video.GetTS32(), video.Memory)
})
}
func (p *FLVPlugin) StartRecord(ctx context.Context, req *flvpb.ReqStartRecord) (res *flvpb.ResponseStartRecord, err error) {
var recordExists bool
var filePath = "."
var fragment = time.Minute
if req.Fragment != nil {
fragment = req.Fragment.AsDuration()
}
if req.FilePath != "" {
filePath = req.FilePath
}
res = &flvpb.ResponseStartRecord{}
_, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath
})
if recordExists {
err = pkg.ErrRecordExists
return
}
recordConf := config.Record{
Append: false,
Fragment: fragment,
FilePath: filePath,
Type: "flv",
}
var stream *m7s.Publisher
var ok bool
if stream, ok = p.Server.Streams.SafeGet(req.StreamPath); !ok {
var sub *m7s.Subscriber
sub, err = p.Subscribe(ctx, req.StreamPath)
if err != nil || sub == nil {
err = pkg.ErrNotFound
return
}
defer sub.Stop(task.ErrAutoStop)
if stream, ok = p.Server.Streams.SafeGet(req.StreamPath); !ok {
err = pkg.ErrNotFound
return
}
}
job := p.Record(stream, recordConf, nil)
res.Data = uint64(job.GetTaskPointer())
err = job.WaitStarted()
return
}
+157 -20
View File
@@ -10,7 +10,7 @@ import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
_ "google.golang.org/protobuf/types/known/durationpb"
durationpb "google.golang.org/protobuf/types/known/durationpb"
emptypb "google.golang.org/protobuf/types/known/emptypb"
_ "google.golang.org/protobuf/types/known/timestamppb"
pb "m7s.live/v5/pb"
@@ -194,6 +194,126 @@ func (x *ReqRecordDelete) GetRange() string {
return ""
}
type ReqStartRecord struct {
state protoimpl.MessageState `protogen:"open.v1"`
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
Fragment *durationpb.Duration `protobuf:"bytes,2,opt,name=fragment,proto3" json:"fragment,omitempty"`
FilePath string `protobuf:"bytes,3,opt,name=filePath,proto3" json:"filePath,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ReqStartRecord) Reset() {
*x = ReqStartRecord{}
mi := &file_flv_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ReqStartRecord) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReqStartRecord) ProtoMessage() {}
func (x *ReqStartRecord) ProtoReflect() protoreflect.Message {
mi := &file_flv_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReqStartRecord.ProtoReflect.Descriptor instead.
func (*ReqStartRecord) Descriptor() ([]byte, []int) {
return file_flv_proto_rawDescGZIP(), []int{2}
}
func (x *ReqStartRecord) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
func (x *ReqStartRecord) GetFragment() *durationpb.Duration {
if x != nil {
return x.Fragment
}
return nil
}
func (x *ReqStartRecord) GetFilePath() string {
if x != nil {
return x.FilePath
}
return ""
}
type ResponseStartRecord struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data uint64 `protobuf:"varint,3,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ResponseStartRecord) Reset() {
*x = ResponseStartRecord{}
mi := &file_flv_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ResponseStartRecord) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ResponseStartRecord) ProtoMessage() {}
func (x *ResponseStartRecord) ProtoReflect() protoreflect.Message {
mi := &file_flv_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ResponseStartRecord.ProtoReflect.Descriptor instead.
func (*ResponseStartRecord) Descriptor() ([]byte, []int) {
return file_flv_proto_rawDescGZIP(), []int{3}
}
func (x *ResponseStartRecord) GetCode() int32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ResponseStartRecord) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ResponseStartRecord) GetData() uint64 {
if x != nil {
return x.Data
}
return 0
}
var File_flv_proto protoreflect.FileDescriptor
const file_flv_proto_rawDesc = "" +
@@ -216,11 +336,22 @@ const file_flv_proto_rawDesc = "" +
"\x03ids\x18\x02 \x03(\rR\x03ids\x12\x1c\n" +
"\tstartTime\x18\x03 \x01(\tR\tstartTime\x12\x18\n" +
"\aendTime\x18\x04 \x01(\tR\aendTime\x12\x14\n" +
"\x05range\x18\x05 \x01(\tR\x05range2\x9e\x02\n" +
"\x05range\x18\x05 \x01(\tR\x05range\"\x83\x01\n" +
"\x0eReqStartRecord\x12\x1e\n" +
"\n" +
"streamPath\x18\x01 \x01(\tR\n" +
"streamPath\x125\n" +
"\bfragment\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\bfragment\x12\x1a\n" +
"\bfilePath\x18\x03 \x01(\tR\bfilePath\"W\n" +
"\x13ResponseStartRecord\x12\x12\n" +
"\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12\x12\n" +
"\x04data\x18\x03 \x01(\x04R\x04data2\x87\x03\n" +
"\x03api\x12]\n" +
"\x04List\x12\x12.flv.ReqRecordList\x1a\x1a.global.RecordResponseList\"%\x82\xd3\xe4\x93\x02\x1f\x12\x1d/flv/api/list/{streamPath=**}\x12T\n" +
"\aCatalog\x12\x16.google.protobuf.Empty\x1a\x17.global.ResponseCatalog\"\x18\x82\xd3\xe4\x93\x02\x12\x12\x10/flv/api/catalog\x12b\n" +
"\x06Delete\x12\x14.flv.ReqRecordDelete\x1a\x16.global.ResponseDelete\"*\x82\xd3\xe4\x93\x02$:\x01*\"\x1f/flv/api/delete/{streamPath=**}B\x1bZ\x19m7s.live/v5/plugin/flv/pbb\x06proto3"
"\x06Delete\x12\x14.flv.ReqRecordDelete\x1a\x16.global.ResponseDelete\"*\x82\xd3\xe4\x93\x02$:\x01*\"\x1f/flv/api/delete/{streamPath=**}\x12g\n" +
"\vStartRecord\x12\x13.flv.ReqStartRecord\x1a\x18.flv.ResponseStartRecord\")\x82\xd3\xe4\x93\x02#:\x01*\"\x1e/flv/api/start/{streamPath=**}B\x1bZ\x19m7s.live/v5/plugin/flv/pbb\x06proto3"
var (
file_flv_proto_rawDescOnce sync.Once
@@ -234,27 +365,33 @@ func file_flv_proto_rawDescGZIP() []byte {
return file_flv_proto_rawDescData
}
var file_flv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_flv_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_flv_proto_goTypes = []any{
(*ReqRecordList)(nil), // 0: flv.ReqRecordList
(*ReqRecordDelete)(nil), // 1: flv.ReqRecordDelete
(*emptypb.Empty)(nil), // 2: google.protobuf.Empty
(*pb.RecordResponseList)(nil), // 3: global.RecordResponseList
(*pb.ResponseCatalog)(nil), // 4: global.ResponseCatalog
(*pb.ResponseDelete)(nil), // 5: global.ResponseDelete
(*ReqStartRecord)(nil), // 2: flv.ReqStartRecord
(*ResponseStartRecord)(nil), // 3: flv.ResponseStartRecord
(*durationpb.Duration)(nil), // 4: google.protobuf.Duration
(*emptypb.Empty)(nil), // 5: google.protobuf.Empty
(*pb.RecordResponseList)(nil), // 6: global.RecordResponseList
(*pb.ResponseCatalog)(nil), // 7: global.ResponseCatalog
(*pb.ResponseDelete)(nil), // 8: global.ResponseDelete
}
var file_flv_proto_depIdxs = []int32{
0, // 0: flv.api.List:input_type -> flv.ReqRecordList
2, // 1: flv.api.Catalog:input_type -> google.protobuf.Empty
1, // 2: flv.api.Delete:input_type -> flv.ReqRecordDelete
3, // 3: flv.api.List:output_type -> global.RecordResponseList
4, // 4: flv.api.Catalog:output_type -> global.ResponseCatalog
5, // 5: flv.api.Delete:output_type -> global.ResponseDelete
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
4, // 0: flv.ReqStartRecord.fragment:type_name -> google.protobuf.Duration
0, // 1: flv.api.List:input_type -> flv.ReqRecordList
5, // 2: flv.api.Catalog:input_type -> google.protobuf.Empty
1, // 3: flv.api.Delete:input_type -> flv.ReqRecordDelete
2, // 4: flv.api.StartRecord:input_type -> flv.ReqStartRecord
6, // 5: flv.api.List:output_type -> global.RecordResponseList
7, // 6: flv.api.Catalog:output_type -> global.ResponseCatalog
8, // 7: flv.api.Delete:output_type -> global.ResponseDelete
3, // 8: flv.api.StartRecord:output_type -> flv.ResponseStartRecord
5, // [5:9] is the sub-list for method output_type
1, // [1:5] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_flv_proto_init() }
@@ -268,7 +405,7 @@ func file_flv_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_flv_proto_rawDesc), len(file_flv_proto_rawDesc)),
NumEnums: 0,
NumMessages: 2,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
+145 -131
View File
@@ -10,6 +10,7 @@ package pb
import (
"context"
"errors"
"io"
"net/http"
@@ -25,176 +26,183 @@ import (
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
var (
filter_Api_List_0 = &utilities.DoubleArray{Encoding: map[string]int{"streamPath": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}}
_ codes.Code
_ io.Reader
_ status.Status
_ = errors.New
_ = runtime.String
_ = utilities.NewDoubleArray
_ = metadata.Join
)
var filter_Api_List_0 = &utilities.DoubleArray{Encoding: map[string]int{"streamPath": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}}
func request_Api_List_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ReqRecordList
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
protoReq ReqRecordList
metadata runtime.ServerMetadata
err error
)
val, ok = pathParams["streamPath"]
io.Copy(io.Discard, req.Body)
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_List_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.List(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_List_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ReqRecordList
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
protoReq ReqRecordList
metadata runtime.ServerMetadata
err error
)
val, ok = pathParams["streamPath"]
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_List_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.List(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_Catalog_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
var (
protoReq emptypb.Empty
metadata runtime.ServerMetadata
)
io.Copy(io.Discard, req.Body)
msg, err := client.Catalog(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_Catalog_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
var (
protoReq emptypb.Empty
metadata runtime.ServerMetadata
)
msg, err := server.Catalog(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_Delete_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ReqRecordDelete
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
var (
protoReq ReqRecordDelete
metadata runtime.ServerMetadata
err error
)
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := client.Delete(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_Delete_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ReqRecordDelete
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
var (
protoReq ReqRecordDelete
metadata runtime.ServerMetadata
err error
)
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := server.Delete(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_StartRecord_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq ReqStartRecord
metadata runtime.ServerMetadata
err error
)
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := client.StartRecord(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_StartRecord_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var (
protoReq ReqStartRecord
metadata runtime.ServerMetadata
err error
)
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
val, ok := pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := server.StartRecord(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterApiHandlerFromEndpoint instead.
// GRPC interceptors will not work for this type of registration. To use interceptors, you must use the "runtime.WithMiddlewares" option in the "runtime.NewServeMux" call.
func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ApiServer) error {
mux.Handle("GET", pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodGet, pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/List", runtime.WithHTTPPathPattern("/flv/api/list/{streamPath=**}"))
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/List", runtime.WithHTTPPathPattern("/flv/api/list/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -206,20 +214,15 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_Catalog_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodGet, pattern_Api_Catalog_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/Catalog", runtime.WithHTTPPathPattern("/flv/api/catalog"))
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/Catalog", runtime.WithHTTPPathPattern("/flv/api/catalog"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -231,20 +234,15 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Catalog_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodPost, pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/Delete", runtime.WithHTTPPathPattern("/flv/api/delete/{streamPath=**}"))
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/Delete", runtime.WithHTTPPathPattern("/flv/api/delete/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -256,9 +254,27 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Delete_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodPost, pattern_Api_StartRecord_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/flv.Api/StartRecord", runtime.WithHTTPPathPattern("/flv/api/start/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_StartRecord_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StartRecord_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
@@ -267,25 +283,24 @@ func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server
// RegisterApiHandlerFromEndpoint is same as RegisterApiHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterApiHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterApiHandler(ctx, mux, conn)
}
@@ -299,16 +314,13 @@ func RegisterApiHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.C
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ApiClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ApiClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "ApiClient" to call the correct interceptors.
// "ApiClient" to call the correct interceptors. This client ignores the HTTP middlewares.
func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ApiClient) error {
mux.Handle("GET", pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodGet, pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/flv.Api/List", runtime.WithHTTPPathPattern("/flv/api/list/{streamPath=**}"))
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/flv.Api/List", runtime.WithHTTPPathPattern("/flv/api/list/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -319,18 +331,13 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_Catalog_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodGet, pattern_Api_Catalog_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/flv.Api/Catalog", runtime.WithHTTPPathPattern("/flv/api/catalog"))
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/flv.Api/Catalog", runtime.WithHTTPPathPattern("/flv/api/catalog"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -341,18 +348,13 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Catalog_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
mux.Handle(http.MethodPost, pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/flv.Api/Delete", runtime.WithHTTPPathPattern("/flv/api/delete/{streamPath=**}"))
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/flv.Api/Delete", runtime.WithHTTPPathPattern("/flv/api/delete/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
@@ -363,26 +365,38 @@ func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Delete_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle(http.MethodPost, pattern_Api_StartRecord_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/flv.Api/StartRecord", runtime.WithHTTPPathPattern("/flv/api/start/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_StartRecord_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_StartRecord_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Api_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"flv", "api", "list", "streamPath"}, ""))
pattern_Api_Catalog_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"flv", "api", "catalog"}, ""))
pattern_Api_Delete_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"flv", "api", "delete", "streamPath"}, ""))
pattern_Api_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"flv", "api", "list", "streamPath"}, ""))
pattern_Api_Catalog_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"flv", "api", "catalog"}, ""))
pattern_Api_Delete_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"flv", "api", "delete", "streamPath"}, ""))
pattern_Api_StartRecord_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"flv", "api", "start", "streamPath"}, ""))
)
var (
forward_Api_List_0 = runtime.ForwardResponseMessage
forward_Api_Catalog_0 = runtime.ForwardResponseMessage
forward_Api_Delete_0 = runtime.ForwardResponseMessage
forward_Api_List_0 = runtime.ForwardResponseMessage
forward_Api_Catalog_0 = runtime.ForwardResponseMessage
forward_Api_Delete_0 = runtime.ForwardResponseMessage
forward_Api_StartRecord_0 = runtime.ForwardResponseMessage
)
+19 -1
View File
@@ -24,6 +24,12 @@ service api {
body: "*"
};
}
rpc StartRecord (ReqStartRecord) returns (ResponseStartRecord) {
option (google.api.http) = {
post: "/flv/api/start/{streamPath=**}"
body: "*"
};
}
}
message ReqRecordList {
@@ -42,4 +48,16 @@ message ReqRecordDelete {
string startTime = 3;
string endTime = 4;
string range = 5;
}
}
message ReqStartRecord {
string streamPath = 1;
google.protobuf.Duration fragment = 2;
string filePath = 3;
}
message ResponseStartRecord {
int32 code = 1;
string message = 2;
uint64 data = 3;
}
+41 -3
View File
@@ -21,9 +21,10 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
Api_List_FullMethodName = "/flv.api/List"
Api_Catalog_FullMethodName = "/flv.api/Catalog"
Api_Delete_FullMethodName = "/flv.api/Delete"
Api_List_FullMethodName = "/flv.api/List"
Api_Catalog_FullMethodName = "/flv.api/Catalog"
Api_Delete_FullMethodName = "/flv.api/Delete"
Api_StartRecord_FullMethodName = "/flv.api/StartRecord"
)
// ApiClient is the client API for Api service.
@@ -33,6 +34,7 @@ type ApiClient interface {
List(ctx context.Context, in *ReqRecordList, opts ...grpc.CallOption) (*pb.RecordResponseList, error)
Catalog(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*pb.ResponseCatalog, error)
Delete(ctx context.Context, in *ReqRecordDelete, opts ...grpc.CallOption) (*pb.ResponseDelete, error)
StartRecord(ctx context.Context, in *ReqStartRecord, opts ...grpc.CallOption) (*ResponseStartRecord, error)
}
type apiClient struct {
@@ -73,6 +75,16 @@ func (c *apiClient) Delete(ctx context.Context, in *ReqRecordDelete, opts ...grp
return out, nil
}
func (c *apiClient) StartRecord(ctx context.Context, in *ReqStartRecord, opts ...grpc.CallOption) (*ResponseStartRecord, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ResponseStartRecord)
err := c.cc.Invoke(ctx, Api_StartRecord_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility.
@@ -80,6 +92,7 @@ type ApiServer interface {
List(context.Context, *ReqRecordList) (*pb.RecordResponseList, error)
Catalog(context.Context, *emptypb.Empty) (*pb.ResponseCatalog, error)
Delete(context.Context, *ReqRecordDelete) (*pb.ResponseDelete, error)
StartRecord(context.Context, *ReqStartRecord) (*ResponseStartRecord, error)
mustEmbedUnimplementedApiServer()
}
@@ -99,6 +112,9 @@ func (UnimplementedApiServer) Catalog(context.Context, *emptypb.Empty) (*pb.Resp
func (UnimplementedApiServer) Delete(context.Context, *ReqRecordDelete) (*pb.ResponseDelete, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedApiServer) StartRecord(context.Context, *ReqStartRecord) (*ResponseStartRecord, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartRecord not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
@@ -174,6 +190,24 @@ func _Api_Delete_Handler(srv interface{}, ctx context.Context, dec func(interfac
return interceptor(ctx, in, info, handler)
}
func _Api_StartRecord_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReqStartRecord)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).StartRecord(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_StartRecord_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).StartRecord(ctx, req.(*ReqStartRecord))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -193,6 +227,10 @@ var Api_ServiceDesc = grpc.ServiceDesc{
MethodName: "Delete",
Handler: _Api_Delete_Handler,
},
{
MethodName: "StartRecord",
Handler: _Api_StartRecord_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "flv.proto",
+16 -5
View File
@@ -87,15 +87,26 @@ func (p *RecordReader) Run() (err error) {
if st.GetKey() == targetType {
if localStorage, ok := st.(*storage.LocalStorage); ok {
filePath = localStorage.GetFullPath(filePath, stream.StorageLevel)
p.File, err = os.Open(filePath)
if err != nil {
continue
}
} else {
filePath, err = st.GetURL(p, stream.StreamPath)
if err != nil {
continue
}
p.File, err = st.OpenFile(p, filePath)
}
} else {
p.Warn("storage type mismatch, fallback to relative path", "streamType", stream.StorageType, "globalType", st.GetKey(), "path", filePath)
}
}
}
p.File, err = os.Open(filePath)
if err != nil {
continue
// 如果 storage 已经通过 OpenFile 打开了文件(storage.File),则直接使用;否则尝试本地打开路径
if p.File == nil {
p.File, err = os.Open(filePath)
if err != nil {
continue
}
}
if p.reader != nil {
p.reader.Recycle()
+23
View File
@@ -178,6 +178,29 @@ func (r *Recorder) createStream(start time.Time) (err error) {
if err != nil {
return
}
// 写入序列头(如果已知)以保证每个分片可独立回放
// 优先使用 Subscriber 的 VideoReader/AudioReader 的 codec context 的 sequence frame
sub := r.RecordJob.Subscriber
if sub != nil {
// 视频序列头
if sub.VideoReader != nil && sub.VideoReader.Track != nil && sub.VideoReader.Track.ICodecCtx != nil {
if seqCtx, ok := sub.VideoReader.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.VideoFrame]); ok {
seq := seqCtx.GetSequenceFrame()
if seq != nil && seq.Size > 0 {
_ = r.writer.WriteTag(FLV_TAG_TYPE_VIDEO, 0, uint32(seq.Size), seq.Buffers...)
}
}
}
// 音频序列头
if sub.AudioReader != nil && sub.AudioReader.Track != nil && sub.AudioReader.Track.ICodecCtx != nil {
if seqCtx, ok := sub.AudioReader.Track.ICodecCtx.(pkg.ISequenceCodecCtx[*rtmp.AudioFrame]); ok {
seq := seqCtx.GetSequenceFrame()
if seq != nil && seq.Size > 0 {
_ = r.writer.WriteTag(FLV_TAG_TYPE_AUDIO, 0, uint32(seq.Size), seq.Buffers...)
}
}
}
}
return
}
+2 -1
View File
@@ -17,6 +17,7 @@ import (
pkg "m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/format"
"m7s.live/v5/pkg/storage"
"m7s.live/v5/pkg/util"
)
@@ -57,7 +58,7 @@ type (
PullJob PullJob
PullStartTime, PullEndTime time.Time
Streams []RecordStream
File *os.File
File storage.File
MaxTS int64
seekChan chan time.Time
Type string