feat: add task stop and restart api

This commit is contained in:
langhuihui
2024-11-13 14:30:22 +08:00
parent b6d4d8ae0f
commit 69638cefa2
6 changed files with 1011 additions and 610 deletions
+24 -5
View File
@@ -11,6 +11,7 @@ import (
"runtime"
"strings"
"time"
"unsafe"
"m7s.live/v5/pkg/task"
@@ -163,7 +164,7 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResponse, err error) {
var fillData func(m task.ITask) *pb.TaskTreeData
fillData = func(m task.ITask) (res *pb.TaskTreeData) {
res = &pb.TaskTreeData{Id: m.GetTaskID(), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: m.GetDescriptions()}
res = &pb.TaskTreeData{Id: m.GetTaskID(), Pointer: uint64(uintptr(unsafe.Pointer(m.GetTask()))), State: uint32(m.GetState()), Type: uint32(m.GetTaskType()), Owner: m.GetOwnerType(), StartTime: timestamppb.New(m.GetTask().StartTime), Description: m.GetDescriptions()}
if job, ok := m.(task.IJob); ok {
if blockedTask := job.Blocked(); blockedTask != nil {
res.Blocked = fillData(blockedTask)
@@ -178,6 +179,24 @@ func (s *Server) TaskTree(context.Context, *emptypb.Empty) (res *pb.TaskTreeResp
return
}
func (s *Server) StopTask(ctx context.Context, req *pb.RequestWithId64) (resp *pb.SuccessResponse, err error) {
t := (*task.Task)(unsafe.Pointer(uintptr(req.Id)))
if t == nil {
return nil, pkg.ErrNotFound
}
t.Stop(task.ErrStopByUser)
return &pb.SuccessResponse{}, nil
}
func (s *Server) RestartTask(ctx context.Context, req *pb.RequestWithId64) (resp *pb.SuccessResponse, err error) {
t := (*task.Task)(unsafe.Pointer(uintptr(req.Id)))
if t == nil {
return nil, pkg.ErrNotFound
}
t.Stop(task.ErrRestart)
return &pb.SuccessResponse{}, nil
}
func (s *Server) GetRecording(ctx context.Context, req *emptypb.Empty) (resp *pb.RecordingListResponse, err error) {
if s.DB == nil {
err = pkg.ErrNoDB
@@ -373,20 +392,20 @@ func (s *Server) VideoTrackSnap(ctx context.Context, req *pb.StreamSnapRequest)
return
}
func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) {
func (s *Server) Restart(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
if s, ok := Servers.Get(req.Id); ok {
s.Stop(pkg.ErrRestart)
}
return empty, err
return &pb.SuccessResponse{}, err
}
func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *emptypb.Empty, err error) {
func (s *Server) Shutdown(ctx context.Context, req *pb.RequestWithId) (res *pb.SuccessResponse, err error) {
if s, ok := Servers.Get(req.Id); ok {
s.Stop(task.ErrStopByUser)
} else {
return nil, pkg.ErrNotFound
}
return empty, err
return &pb.SuccessResponse{}, err
}
func (s *Server) ChangeSubscribe(ctx context.Context, req *pb.ChangeSubscribeRequest) (res *pb.SuccessResponse, err error) {