feat: add storage to records

This commit is contained in:
langhuihui
2025-09-25 09:34:17 +08:00
parent 526d2799bb
commit c0a13cbbf2
24 changed files with 1743 additions and 2089 deletions
+2 -2
View File
@@ -1,5 +1,5 @@
global:
# loglevel: debug
loglevel: debug
http:
listenaddr: :8081
listenaddrtls: :8555
@@ -10,4 +10,4 @@ rtsp:
rtmp:
tcp: :1936
webrtc:
enable: false
port: udp:9000-9100
+25 -13
View File
@@ -6,11 +6,21 @@ global:
loglevel: debug
admin:
enablelogin: false
# pullproxy:
# - id: 1 # 唯一ID标识,必须大于0
# name: "camera-1" # 拉流代理名称
# type: "rtmp" # 拉流协议类型
# pull:
# url: "rtmp://example.com/live/stream1" # 拉流源地址
# streampath: "live/camera1" # 在Monibuca中的流路径
# pullonstart: true # 是否在启动时自动开始拉流
# description: "前门摄像头" # 描述信息
debug:
enableTaskHistory: true #是否启用任务历史记录
srt:
listenaddr: :6000
passphrase: foobarfoobar
# passphrase: foobarfoobar
gb28181:
enable: false # 是否启用GB28181协议
autoinvite: false #建议使用false,开启后会自动邀请设备推流
@@ -55,6 +65,15 @@ mp4:
# ^live/.+:
# fragment: 10s
# filepath: record/$0
# storage:
# s3:
# endpoint: "storage-dev.xiding.tech"
# accessKeyId: "xidinguser"
# secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c="
# bucket: "vidu-media-bucket"
# pathPrefix: ""
# forcePathStyle: true
# useSSL: true
# pull:
# live/test: /Users/dexter/Movies/1744963190.mp4
onsub:
@@ -89,18 +108,11 @@ hls:
# onpub:
# transform:
# .* : 5s x 3
s3:
enable: false
auto: true # 启用自动上传
deleteAfterUpload: false # 上传后保留本地文件
endpoint: "storage-dev.xiding.tech"
accessKeyId: "xidinguser"
secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c="
bucket: "vidu-media-bucket"
pathPrefix: "recordings"
forcePathStyle: true
useSSL: true
webrtc:
port: udp:9000-9100
# onpub:
# push:
# .*: http://localhost:8081/webrtc/push/$0
rtsp:
# pull:
+1 -2
View File
@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"maps"
"os"
"reflect"
"regexp"
@@ -432,5 +431,5 @@ func (config *Config) assign(k string, v any) reflect.Value {
func Parse(target any, conf map[string]any) {
var c Config
c.Parse(target)
c.ParseModifyFile(maps.Clone(conf))
c.ParseModifyFile(conf)
}
+10 -8
View File
@@ -60,7 +60,8 @@ type (
EventLevel = string
RecordMode = string
HookType = string
Publish struct {
Publish struct {
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
PubAudio bool `default:"true" desc:"是否发布音频"`
PubVideo bool `default:"true" desc:"是否发布视频"`
@@ -121,13 +122,14 @@ type (
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"`
}
Record struct {
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式,event=事件录像模式;default:'auto'"`
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
FilePath string `desc:"录制文件路径"` // 录制文件路径
Fragment time.Duration `desc:"分片时长"` // 分片时长
RealTime bool `desc:"是否实时录制"` // 是否实时录制
Append bool `desc:"是否追加录制"` // 是否追加录制
Event *RecordEvent `json:"event" desc:"事件录像配置" gorm:"-"` // 事件录像配置
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式" gorm:"type:varchar(255);comment:事件类型,auto=连续录像模式,event=事件录像模式;default:'auto'"`
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
FilePath string `desc:"录制文件路径"` // 录制文件路径
Fragment time.Duration `desc:"分片时长"` // 分片时长
RealTime bool `desc:"是否实时录制"` // 是否实时录制
Append bool `desc:"是否追加录制"` // 是否追加录制
Event *RecordEvent `json:"event" desc:"事件录像配置" gorm:"-"` // 事件录像配置
Storage map[string]any `json:"storage" desc:"存储配置" gorm:"-"` // 存储配置
}
TransfromOutput struct {
Target string `desc:"转码目标"` // 转码目标
+214
View File
@@ -0,0 +1,214 @@
# Storage Package
这个包提供了统一的存储接口,支持多种存储后端,包括本地存储、S3、OSS和COS。
## 条件编译
每种存储类型都使用条件编译,只有在指定相应的build tag时才会被编译:
- `local`: 本地文件系统存储
- `s3`: Amazon S3存储
- `oss`: 阿里云OSS存储
- `cos`: 腾讯云COS存储
## 使用方法
### 编译时指定存储类型
```bash
# 只编译本地存储(默认包含,无需额外tags)
go build
# 只编译S3存储
go build -tags s3
# 编译多种存储类型
go build -tags "s3,oss"
# 编译所有存储类型
go build -tags "s3,oss,cos"
# 编译所有存储类型(包括本地存储)
go build -tags "s3,oss,cos"
```
**注意**
- 本地存储(`local`)默认包含,无需指定build tag
- S3存储需要`-tags s3`
- OSS存储需要`-tags oss`
- COS存储需要`-tags cos`
- 可以组合多个tags来支持多种存储类型
### 代码中使用
```go
import "m7s.live/v5/pkg/storage"
// 创建本地存储
localConfig := storage.LocalStorageConfig("/path/to/storage")
localStorage, err := storage.CreateStorage("local", localConfig)
// 创建S3存储
s3Config := &storage.S3StorageConfig{
Endpoint: "s3.amazonaws.com",
Region: "us-east-1",
AccessKeyID: "your-access-key",
SecretAccessKey: "your-secret-key",
Bucket: "your-bucket",
ForcePathStyle: false, // MinIO需要设置为true
UseSSL: true,
Timeout: 30 * time.Second,
}
s3Storage, err := storage.CreateStorage("s3", s3Config)
// 创建OSS存储
ossConfig := &storage.OSSStorageConfig{
Endpoint: "oss-cn-hangzhou.aliyuncs.com",
AccessKeyID: "your-access-key-id",
AccessKeySecret: "your-access-key-secret",
Bucket: "your-bucket",
UseSSL: true,
Timeout: 30,
}
ossStorage, err := storage.CreateStorage("oss", ossConfig)
// 创建COS存储
cosConfig := &storage.COSStorageConfig{
SecretID: "your-secret-id",
SecretKey: "your-secret-key",
Region: "ap-beijing",
Bucket: "your-bucket",
UseHTTPS: true,
Timeout: 30,
}
cosStorage, err := storage.CreateStorage("cos", cosConfig)
```
## 存储类型
### Local Storage (`local`)
本地文件系统存储,不需要额外的依赖。
### S3 Storage (`s3`)
Amazon S3兼容存储,包括AWS S3和MinIO等。
依赖:
- `github.com/aws/aws-sdk-go`
### OSS Storage (`oss`)
阿里云对象存储服务。
依赖:
- `github.com/aliyun/aliyun-oss-go-sdk`
### COS Storage (`cos`)
腾讯云对象存储服务。
依赖:
- `github.com/tencentyun/cos-go-sdk-v5`
## 工厂模式
存储包使用工厂模式来创建不同类型的存储实例:
```go
var Factory = map[string]func(any) (Storage, error){}
```
每种存储类型在各自的文件中通过`init()`函数注册到工厂中:
- `local.go`: 注册本地存储工厂函数
- `s3.go`: 注册S3存储工厂函数(需要`-tags s3`
- `oss.go`: 注册OSS存储工厂函数(需要`-tags oss`
- `cos.go`: 注册COS存储工厂函数(需要`-tags cos`
使用`CreateStorage(type, config)`函数来创建存储实例,其中`type`是存储类型字符串,`config`是对应的配置对象。
## 存储接口
所有存储实现都遵循统一的`Storage`接口:
```go
type Storage interface {
// CreateFile 创建文件并返回文件句柄
CreateFile(ctx context.Context, path string) (File, error)
// Delete 删除文件
Delete(ctx context.Context, path string) error
// Exists 检查文件是否存在
Exists(ctx context.Context, path string) (bool, error)
// GetSize 获取文件大小
GetSize(ctx context.Context, path string) (int64, error)
// GetURL 获取文件访问URL
GetURL(ctx context.Context, path string) (string, error)
// List 列出文件
List(ctx context.Context, prefix string) ([]FileInfo, error)
// Close 关闭存储连接
Close() error
}
```
## 使用示例
```go
package main
import (
"context"
"fmt"
"m7s.live/v5/pkg/storage"
)
func main() {
// 创建本地存储
config := storage.LocalStorageConfig("/tmp/storage")
s, err := storage.CreateStorage("local", config)
if err != nil {
panic(err)
}
defer s.Close()
ctx := context.Background()
// 创建文件并写入内容
file, err := s.CreateFile(ctx, "test.txt")
if err != nil {
panic(err)
}
file.Write([]byte("Hello, World!"))
file.Close()
// 检查文件是否存在
exists, err := s.Exists(ctx, "test.txt")
if err != nil {
panic(err)
}
fmt.Printf("File exists: %v\n", exists)
// 获取文件大小
size, err := s.GetSize(ctx, "test.txt")
if err != nil {
panic(err)
}
fmt.Printf("File size: %d bytes\n", size)
// 列出文件
files, err := s.List(ctx, "")
if err != nil {
panic(err)
}
for _, file := range files {
fmt.Printf("File: %s, Size: %d\n", file.Name, file.Size)
}
}
```
+366
View File
@@ -0,0 +1,366 @@
//go:build cos
package storage
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"time"
"github.com/tencentyun/cos-go-sdk-v5"
)
// COSStorageConfig COS存储配置
type COSStorageConfig struct {
SecretID string `yaml:"secret_id" desc:"COS Secret ID"`
SecretKey string `yaml:"secret_key" desc:"COS Secret Key"`
Region string `yaml:"region" desc:"COS区域"`
Bucket string `yaml:"bucket" desc:"COS存储桶名称"`
PathPrefix string `yaml:"path_prefix" desc:"文件路径前缀"`
UseHTTPS bool `yaml:"use_https" desc:"是否使用HTTPS" default:"true"`
Timeout int `yaml:"timeout" desc:"上传超时时间(秒)" default:"30"`
}
func (c *COSStorageConfig) GetType() StorageType {
return StorageTypeCOS
}
func (c *COSStorageConfig) Validate() error {
if c.SecretID == "" {
return fmt.Errorf("secret_id is required for COS storage")
}
if c.SecretKey == "" {
return fmt.Errorf("secret_key is required for COS storage")
}
if c.Bucket == "" {
return fmt.Errorf("bucket is required for COS storage")
}
if c.Region == "" {
return fmt.Errorf("region is required for COS storage")
}
return nil
}
// COSStorage COS存储实现
type COSStorage struct {
config *COSStorageConfig
client *cos.Client
}
// NewCOSStorage 创建COS存储实例
func NewCOSStorage(config *COSStorageConfig) (*COSStorage, error) {
if err := config.Validate(); err != nil {
return nil, err
}
// 设置默认值
if config.Timeout == 0 {
config.Timeout = 30
}
// 构建存储桶URL
scheme := "http"
if config.UseHTTPS {
scheme = "https"
}
bucketURL := fmt.Sprintf("%s://%s.cos.%s.myqcloud.com", scheme, config.Bucket, config.Region)
// 创建COS客户端
client := cos.NewClient(&cos.BaseURL{BucketURL: bucketURL}, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretID,
SecretKey: config.SecretKey,
},
})
// 测试连接
if err := testCOSConnection(client, config.Bucket); err != nil {
return nil, fmt.Errorf("COS connection test failed: %w", err)
}
return &COSStorage{
config: config,
client: client,
}, nil
}
func (s *COSStorage) CreateFile(ctx context.Context, path string) (File, error) {
objectKey := s.getObjectKey(path)
return &COSFile{
storage: s,
objectKey: objectKey,
ctx: ctx,
}, nil
}
func (s *COSStorage) Delete(ctx context.Context, path string) error {
objectKey := s.getObjectKey(path)
_, err := s.client.Object.Delete(ctx, objectKey)
return err
}
func (s *COSStorage) Exists(ctx context.Context, path string) (bool, error) {
objectKey := s.getObjectKey(path)
_, err := s.client.Object.Head(ctx, objectKey, nil)
if err != nil {
// 检查是否是404错误
if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "NoSuchKey") {
return false, nil
}
return false, err
}
return true, nil
}
func (s *COSStorage) GetSize(ctx context.Context, path string) (int64, error) {
objectKey := s.getObjectKey(path)
result, _, err := s.client.Object.Head(ctx, objectKey, nil)
if err != nil {
if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "NoSuchKey") {
return 0, ErrFileNotFound
}
return 0, err
}
return result.ContentLength, nil
}
func (s *COSStorage) GetURL(ctx context.Context, path string) (string, error) {
objectKey := s.getObjectKey(path)
// 生成预签名URL24小时有效期
presignedURL, err := s.client.Object.GetPresignedURL(ctx, http.MethodGet, objectKey,
s.config.SecretID, s.config.SecretKey, 24*time.Hour, nil)
if err != nil {
return "", err
}
return presignedURL.String(), nil
}
func (s *COSStorage) List(ctx context.Context, prefix string) ([]FileInfo, error) {
objectPrefix := s.getObjectKey(prefix)
var files []FileInfo
opt := &cos.BucketGetOptions{
Prefix: objectPrefix,
MaxKeys: 1000,
}
result, _, err := s.client.Bucket.Get(ctx, opt)
if err != nil {
return nil, err
}
for _, obj := range result.Contents {
// 移除路径前缀
fileName := obj.Key
if s.config.PathPrefix != "" {
fileName = strings.TrimPrefix(fileName, strings.TrimSuffix(s.config.PathPrefix, "/")+"/")
}
files = append(files, FileInfo{
Name: fileName,
Size: obj.Size,
LastModified: obj.LastModified,
ETag: obj.ETag,
})
}
return files, nil
}
func (s *COSStorage) Close() error {
// COS客户端无需显式关闭
return nil
}
// getObjectKey 获取COS对象键
func (s *COSStorage) getObjectKey(path string) string {
if s.config.PathPrefix != "" {
return strings.TrimSuffix(s.config.PathPrefix, "/") + "/" + path
}
return path
}
// testCOSConnection 测试COS连接
func testCOSConnection(client *cos.Client, bucket string) error {
// 尝试获取存储桶信息来测试连接
_, _, err := client.Bucket.Head(context.Background())
return err
}
// COSFile COS文件读写器
type COSFile struct {
storage *COSStorage
objectKey string
ctx context.Context
tempFile *os.File // 本地临时文件,用于支持随机访问
filePath string // 临时文件路径
}
func (f *COSFile) Name() string {
return f.objectKey
}
func (f *COSFile) Write(p []byte) (n int, err error) {
// 如果还没有创建临时文件,先创建
if f.tempFile == nil {
if err = f.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件
return f.tempFile.Write(p)
}
func (f *COSFile) Read(p []byte) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if f.tempFile == nil {
if err = f.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件读取
return f.tempFile.Read(p)
}
func (f *COSFile) WriteAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建临时文件,先创建
if f.tempFile == nil {
if err = f.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件的指定位置
return f.tempFile.WriteAt(p, off)
}
func (f *COSFile) ReadAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if f.tempFile == nil {
if err = f.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件的指定位置读取
return f.tempFile.ReadAt(p, off)
}
func (f *COSFile) Sync() error {
// 如果使用临时文件,先同步到磁盘
if f.tempFile != nil {
if err := f.tempFile.Sync(); err != nil {
return err
}
}
if err := f.uploadTempFile(); err != nil {
return err
}
return nil
}
func (f *COSFile) Seek(offset int64, whence int) (int64, error) {
// 如果还没有创建临时文件,先创建或下载
if f.tempFile == nil {
if err := f.downloadToTemp(); err != nil {
return 0, err
}
}
// 使用临时文件进行随机访问
return f.tempFile.Seek(offset, whence)
}
func (f *COSFile) Close() error {
if err := f.Sync(); err != nil {
return err
}
if f.tempFile != nil {
f.tempFile.Close()
}
// 清理临时文件
if f.filePath != "" {
os.Remove(f.filePath)
}
return nil
}
// createTempFile 创建临时文件
func (f *COSFile) createTempFile() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "coswriter_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
f.tempFile = tempFile
f.filePath = tempFile.Name()
return nil
}
func (f *COSFile) Stat() (os.FileInfo, error) {
return f.tempFile.Stat()
}
// uploadTempFile 上传临时文件到COS
func (f *COSFile) uploadTempFile() (err error) {
// 上传到COS
_, err = f.storage.client.Object.PutFromFile(f.ctx, f.objectKey, f.filePath, nil)
if err != nil {
return fmt.Errorf("failed to upload to COS: %w", err)
}
return nil
}
// downloadToTemp 下载COS对象到本地临时文件
func (f *COSFile) downloadToTemp() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "cosreader_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
f.tempFile = tempFile
f.filePath = tempFile.Name()
// 下载COS对象
_, err = f.storage.client.Object.GetToFile(f.ctx, f.objectKey, f.filePath, nil)
if err != nil {
tempFile.Close()
os.Remove(f.filePath)
if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "NoSuchKey") {
return ErrFileNotFound
}
return fmt.Errorf("failed to download from COS: %w", err)
}
// 重置文件指针到开始位置
_, err = tempFile.Seek(0, 0)
if err != nil {
tempFile.Close()
os.Remove(f.filePath)
return fmt.Errorf("failed to seek temp file: %w", err)
}
return nil
}
func init() {
Factory["cos"] = func(config any) (Storage, error) {
var cosConfig COSStorageConfig
config.Parse(&cosConfig, config.(map[string]any))
return NewCOSStorage(cosConfig)
}
}
+3
View File
@@ -0,0 +1,3 @@
package storage
var Factory = map[string]func(any) (Storage, error){}
+136
View File
@@ -0,0 +1,136 @@
package storage
import (
"context"
"fmt"
"os"
"path/filepath"
)
// LocalStorageConfig 本地存储配置
type LocalStorageConfig string
func (c LocalStorageConfig) GetType() StorageType {
return StorageTypeLocal
}
func (c LocalStorageConfig) Validate() error {
if c == "" {
return fmt.Errorf("base_path is required for local storage")
}
return nil
}
// LocalStorage 本地存储实现
type LocalStorage struct {
basePath string
}
// NewLocalStorage 创建本地存储实例
func NewLocalStorage(config LocalStorageConfig) (*LocalStorage, error) {
if err := config.Validate(); err != nil {
return nil, err
}
basePath, err := filepath.Abs(string(config))
if err != nil {
return nil, fmt.Errorf("invalid base path: %w", err)
}
// 确保基础路径存在
if err := os.MkdirAll(basePath, 0755); err != nil {
return nil, fmt.Errorf("failed to create base path: %w", err)
}
return &LocalStorage{
basePath: basePath,
}, nil
}
func (s *LocalStorage) CreateFile(ctx context.Context, path string) (File, error) {
// 确保目录存在
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create directory: %w", err)
}
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return nil, fmt.Errorf("failed to create file: %w", err)
}
return file, nil
}
func (s *LocalStorage) Delete(ctx context.Context, path string) error {
return os.Remove(path)
}
func (s *LocalStorage) Exists(ctx context.Context, path string) (bool, error) {
_, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}
func (s *LocalStorage) GetSize(ctx context.Context, path string) (int64, error) {
info, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return 0, ErrFileNotFound
}
return 0, err
}
return info.Size(), nil
}
func (s *LocalStorage) GetURL(ctx context.Context, path string) (string, error) {
// 本地存储返回文件路径
return path, nil
}
func (s *LocalStorage) List(ctx context.Context, prefix string) ([]FileInfo, error) {
searchPath := filepath.Join(prefix)
var files []FileInfo
err := filepath.Walk(searchPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
relPath, err := filepath.Rel(prefix, path)
if err != nil {
return err
}
files = append(files, FileInfo{
Name: relPath,
Size: info.Size(),
LastModified: info.ModTime(),
})
}
return nil
})
return files, err
}
func (s *LocalStorage) Close() error {
// 本地存储无需关闭连接
return nil
}
func init() {
Factory["local"] = func(config any) (Storage, error) {
localConfig, ok := config.(string)
if !ok {
return nil, fmt.Errorf("invalid config type for local storage")
}
return NewLocalStorage(LocalStorageConfig(localConfig))
}
}
+358
View File
@@ -0,0 +1,358 @@
//go:build oss
package storage
import (
"context"
"fmt"
"os"
"strings"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
)
// OSSStorageConfig OSS存储配置
type OSSStorageConfig struct {
Endpoint string `yaml:"endpoint" desc:"OSS服务端点"`
AccessKeyID string `yaml:"access_key_id" desc:"OSS访问密钥ID"`
AccessKeySecret string `yaml:"access_key_secret" desc:"OSS访问密钥Secret"`
Bucket string `yaml:"bucket" desc:"OSS存储桶名称"`
PathPrefix string `yaml:"path_prefix" desc:"文件路径前缀"`
UseSSL bool `yaml:"use_ssl" desc:"是否使用SSL" default:"true"`
Timeout int `yaml:"timeout" desc:"上传超时时间(秒)" default:"30"`
}
func (c *OSSStorageConfig) GetType() StorageType {
return StorageTypeOSS
}
func (c *OSSStorageConfig) Validate() error {
if c.AccessKeyID == "" {
return fmt.Errorf("access_key_id is required for OSS storage")
}
if c.AccessKeySecret == "" {
return fmt.Errorf("access_key_secret is required for OSS storage")
}
if c.Bucket == "" {
return fmt.Errorf("bucket is required for OSS storage")
}
if c.Endpoint == "" {
return fmt.Errorf("endpoint is required for OSS storage")
}
return nil
}
// OSSStorage OSS存储实现
type OSSStorage struct {
config *OSSStorageConfig
client *oss.Client
bucket *oss.Bucket
}
// NewOSSStorage 创建OSS存储实例
func NewOSSStorage(config *OSSStorageConfig) (*OSSStorage, error) {
if err := config.Validate(); err != nil {
return nil, err
}
// 设置默认值
if config.Timeout == 0 {
config.Timeout = 30
}
// 创建OSS客户端
client, err := oss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret)
if err != nil {
return nil, fmt.Errorf("failed to create OSS client: %w", err)
}
// 获取存储桶
bucket, err := client.Bucket(config.Bucket)
if err != nil {
return nil, fmt.Errorf("failed to get OSS bucket: %w", err)
}
// 测试连接
if err := testOSSConnection(bucket); err != nil {
return nil, fmt.Errorf("OSS connection test failed: %w", err)
}
return &OSSStorage{
config: config,
client: client,
bucket: bucket,
}, nil
}
func (s *OSSStorage) CreateFile(ctx context.Context, path string) (File, error) {
objectKey := s.getObjectKey(path)
return &OSSFile{
storage: s,
objectKey: objectKey,
ctx: ctx,
}, nil
}
func (s *OSSStorage) Delete(ctx context.Context, path string) error {
objectKey := s.getObjectKey(path)
return s.bucket.DeleteObject(objectKey)
}
func (s *OSSStorage) Exists(ctx context.Context, path string) (bool, error) {
objectKey := s.getObjectKey(path)
exists, err := s.bucket.IsObjectExist(objectKey)
if err != nil {
return false, err
}
return exists, nil
}
func (s *OSSStorage) GetSize(ctx context.Context, path string) (int64, error) {
objectKey := s.getObjectKey(path)
props, err := s.bucket.GetObjectDetailedMeta(objectKey)
if err != nil {
if strings.Contains(err.Error(), "NoSuchKey") {
return 0, ErrFileNotFound
}
return 0, err
}
contentLength := props.Get("Content-Length")
if contentLength == "" {
return 0, nil
}
var size int64
if _, err := fmt.Sscanf(contentLength, "%d", &size); err != nil {
return 0, fmt.Errorf("failed to parse content length: %w", err)
}
return size, nil
}
func (s *OSSStorage) GetURL(ctx context.Context, path string) (string, error) {
objectKey := s.getObjectKey(path)
// 生成签名URL24小时有效期
url, err := s.bucket.SignURL(objectKey, oss.HTTPGet, 24*3600)
if err != nil {
return "", err
}
return url, nil
}
func (s *OSSStorage) List(ctx context.Context, prefix string) ([]FileInfo, error) {
objectPrefix := s.getObjectKey(prefix)
var files []FileInfo
err := s.bucket.ListObjects(oss.Prefix(objectPrefix), func(result oss.ListObjectsResult) error {
for _, obj := range result.Objects {
// 移除路径前缀
fileName := obj.Key
if s.config.PathPrefix != "" {
fileName = strings.TrimPrefix(fileName, strings.TrimSuffix(s.config.PathPrefix, "/")+"/")
}
files = append(files, FileInfo{
Name: fileName,
Size: obj.Size,
LastModified: obj.LastModified,
ETag: obj.ETag,
})
}
return nil
})
return files, err
}
func (s *OSSStorage) Close() error {
// OSS客户端无需显式关闭
return nil
}
// getObjectKey 获取OSS对象键
func (s *OSSStorage) getObjectKey(path string) string {
if s.config.PathPrefix != "" {
return strings.TrimSuffix(s.config.PathPrefix, "/") + "/" + path
}
return path
}
// testOSSConnection 测试OSS连接
func testOSSConnection(bucket *oss.Bucket) error {
// 尝试列出对象来测试连接
_, err := bucket.ListObjects(oss.MaxKeys(1))
return err
}
// OSSFile OSS文件读写器
type OSSFile struct {
storage *OSSStorage
objectKey string
ctx context.Context
tempFile *os.File // 本地临时文件,用于支持随机访问
filePath string // 临时文件路径
}
func (f *OSSFile) Name() string {
return f.objectKey
}
func (f *OSSFile) Write(p []byte) (n int, err error) {
// 如果还没有创建临时文件,先创建
if f.tempFile == nil {
if err = f.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件
return f.tempFile.Write(p)
}
func (f *OSSFile) Read(p []byte) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if f.tempFile == nil {
if err = f.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件读取
return f.tempFile.Read(p)
}
func (f *OSSFile) WriteAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建临时文件,先创建
if f.tempFile == nil {
if err = f.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件的指定位置
return f.tempFile.WriteAt(p, off)
}
func (f *OSSFile) ReadAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if f.tempFile == nil {
if err = f.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件的指定位置读取
return f.tempFile.ReadAt(p, off)
}
func (f *OSSFile) Sync() error {
// 如果使用临时文件,先同步到磁盘
if f.tempFile != nil {
if err := f.tempFile.Sync(); err != nil {
return err
}
}
if err := f.uploadTempFile(); err != nil {
return err
}
return nil
}
func (f *OSSFile) Seek(offset int64, whence int) (int64, error) {
// 如果还没有创建临时文件,先创建或下载
if f.tempFile == nil {
if err := f.downloadToTemp(); err != nil {
return 0, err
}
}
// 使用临时文件进行随机访问
return f.tempFile.Seek(offset, whence)
}
func (f *OSSFile) Close() error {
if err := f.Sync(); err != nil {
return err
}
if f.tempFile != nil {
f.tempFile.Close()
}
// 清理临时文件
if f.filePath != "" {
os.Remove(f.filePath)
}
return nil
}
// createTempFile 创建临时文件
func (f *OSSFile) createTempFile() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "osswriter_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
f.tempFile = tempFile
f.filePath = tempFile.Name()
return nil
}
func (f *OSSFile) Stat() (os.FileInfo, error) {
return f.tempFile.Stat()
}
// uploadTempFile 上传临时文件到OSS
func (f *OSSFile) uploadTempFile() (err error) {
// 上传到OSS
err = f.storage.bucket.PutObjectFromFile(f.objectKey, f.filePath)
if err != nil {
return fmt.Errorf("failed to upload to OSS: %w", err)
}
return nil
}
// downloadToTemp 下载OSS对象到本地临时文件
func (f *OSSFile) downloadToTemp() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "ossreader_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
f.tempFile = tempFile
f.filePath = tempFile.Name()
// 下载OSS对象
err = f.storage.bucket.GetObjectToFile(f.objectKey, f.filePath)
if err != nil {
tempFile.Close()
os.Remove(f.filePath)
if strings.Contains(err.Error(), "NoSuchKey") {
return ErrFileNotFound
}
return fmt.Errorf("failed to download from OSS: %w", err)
}
// 重置文件指针到开始位置
_, err = tempFile.Seek(0, 0)
if err != nil {
tempFile.Close()
os.Remove(f.filePath)
return fmt.Errorf("failed to seek temp file: %w", err)
}
return nil
}
func init() {
Factory["oss"] = func(config any) (Storage, error) {
var ossConfig OSSStorageConfig
config.Parse(&ossConfig, config.(map[string]any))
return NewOSSStorage(ossConfig)
}
}
+410
View File
@@ -0,0 +1,410 @@
//go:build s3
package storage
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"m7s.live/v5/pkg/config"
)
// S3StorageConfig S3存储配置
type S3StorageConfig struct {
Endpoint string `desc:"S3服务端点"`
Region string `desc:"AWS区域" default:"us-east-1"`
AccessKeyID string `desc:"S3访问密钥ID"`
SecretAccessKey string `desc:"S3秘密访问密钥"`
Bucket string `desc:"S3存储桶名称"`
PathPrefix string `desc:"文件路径前缀"`
ForcePathStyle bool `desc:"强制路径样式(MinIO需要)"`
UseSSL bool `desc:"是否使用SSL" default:"true"`
Timeout time.Duration `desc:"上传超时时间" default:"30s"`
}
func (c *S3StorageConfig) GetType() StorageType {
return StorageTypeS3
}
func (c *S3StorageConfig) Validate() error {
if c.AccessKeyID == "" {
return fmt.Errorf("access_key_id is required for S3 storage")
}
if c.SecretAccessKey == "" {
return fmt.Errorf("secret_access_key is required for S3 storage")
}
if c.Bucket == "" {
return fmt.Errorf("bucket is required for S3 storage")
}
return nil
}
// S3Storage S3存储实现
type S3Storage struct {
config *S3StorageConfig
s3Client *s3.S3
uploader *s3manager.Uploader
downloader *s3manager.Downloader
}
// NewS3Storage 创建S3存储实例
func NewS3Storage(config *S3StorageConfig) (*S3Storage, error) {
if err := config.Validate(); err != nil {
return nil, err
}
// 创建AWS配置
awsConfig := &aws.Config{
Region: aws.String(config.Region),
Credentials: credentials.NewStaticCredentials(config.AccessKeyID, config.SecretAccessKey, ""),
S3ForcePathStyle: aws.Bool(config.ForcePathStyle),
}
// 设置端点(用于MinIO或其他S3兼容服务)
if config.Endpoint != "" {
endpoint := config.Endpoint
if !strings.HasPrefix(endpoint, "http") {
protocol := "http"
if config.UseSSL {
protocol = "https"
}
endpoint = protocol + "://" + endpoint
}
awsConfig.Endpoint = aws.String(endpoint)
awsConfig.DisableSSL = aws.Bool(!config.UseSSL)
}
// 创建AWS会话
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create AWS session: %w", err)
}
// 创建S3客户端
s3Client := s3.New(sess)
// 测试连接
if err := testS3Connection(s3Client, config.Bucket); err != nil {
return nil, fmt.Errorf("S3 connection test failed: %w", err)
}
return &S3Storage{
config: config,
s3Client: s3Client,
uploader: s3manager.NewUploader(sess),
downloader: s3manager.NewDownloader(sess),
}, nil
}
func (s *S3Storage) CreateFile(ctx context.Context, path string) (File, error) {
objectKey := s.getObjectKey(path)
return &S3File{
storage: s,
objectKey: objectKey,
ctx: ctx,
}, nil
}
func (s *S3Storage) Delete(ctx context.Context, path string) error {
objectKey := s.getObjectKey(path)
_, err := s.s3Client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(s.config.Bucket),
Key: aws.String(objectKey),
})
return err
}
func (s *S3Storage) Exists(ctx context.Context, path string) (bool, error) {
objectKey := s.getObjectKey(path)
_, err := s.s3Client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.config.Bucket),
Key: aws.String(objectKey),
})
if err != nil {
// 检查是否是404错误
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
return false, nil
}
return false, err
}
return true, nil
}
func (s *S3Storage) GetSize(ctx context.Context, path string) (int64, error) {
objectKey := s.getObjectKey(path)
result, err := s.s3Client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(s.config.Bucket),
Key: aws.String(objectKey),
})
if err != nil {
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
return 0, ErrFileNotFound
}
return 0, err
}
if result.ContentLength == nil {
return 0, nil
}
return *result.ContentLength, nil
}
func (s *S3Storage) GetURL(ctx context.Context, path string) (string, error) {
objectKey := s.getObjectKey(path)
req, _ := s.s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(s.config.Bucket),
Key: aws.String(objectKey),
})
url, err := req.Presign(24 * time.Hour) // 24小时有效期
if err != nil {
return "", err
}
return url, nil
}
func (s *S3Storage) List(ctx context.Context, prefix string) ([]FileInfo, error) {
objectPrefix := s.getObjectKey(prefix)
var files []FileInfo
err := s.s3Client.ListObjectsV2PagesWithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.config.Bucket),
Prefix: aws.String(objectPrefix),
}, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, obj := range page.Contents {
// 移除路径前缀
fileName := *obj.Key
if s.config.PathPrefix != "" {
fileName = strings.TrimPrefix(fileName, strings.TrimSuffix(s.config.PathPrefix, "/")+"/")
}
files = append(files, FileInfo{
Name: fileName,
Size: *obj.Size,
LastModified: *obj.LastModified,
ETag: *obj.ETag,
})
}
return true
})
return files, err
}
func (s *S3Storage) Close() error {
// S3客户端无需显式关闭
return nil
}
// getObjectKey 获取S3对象键
func (s *S3Storage) getObjectKey(path string) string {
if s.config.PathPrefix != "" {
return strings.TrimSuffix(s.config.PathPrefix, "/") + "/" + path
}
return path
}
// testS3Connection 测试S3连接
func testS3Connection(s3Client *s3.S3, bucket string) error {
_, err := s3Client.HeadBucket(&s3.HeadBucketInput{
Bucket: aws.String(bucket),
})
return err
}
// S3File S3文件读写器
type S3File struct {
storage *S3Storage
objectKey string
ctx context.Context
tempFile *os.File // 本地临时文件,用于支持随机访问
filePath string // 临时文件路径
}
func (w *S3File) Name() string {
return w.objectKey
}
func (w *S3File) Write(p []byte) (n int, err error) {
// 如果还没有创建临时文件,先创建
if w.tempFile == nil {
if err = w.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件
return w.tempFile.Write(p)
}
func (w *S3File) Read(p []byte) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if w.tempFile == nil {
if err = w.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件读取
return w.tempFile.Read(p)
}
func (w *S3File) WriteAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建临时文件,先创建
if w.tempFile == nil {
if err = w.createTempFile(); err != nil {
return 0, err
}
}
// 写入到临时文件的指定位置
return w.tempFile.WriteAt(p, off)
}
func (w *S3File) ReadAt(p []byte, off int64) (n int, err error) {
// 如果还没有创建缓存文件,先下载到本地
if w.tempFile == nil {
if err = w.downloadToTemp(); err != nil {
return 0, err
}
}
// 从本地缓存文件的指定位置读取
return w.tempFile.ReadAt(p, off)
}
func (w *S3File) Sync() error {
// 如果使用临时文件,先同步到磁盘
if w.tempFile != nil {
if err := w.tempFile.Sync(); err != nil {
return err
}
}
if err := w.uploadTempFile(); err != nil {
return err
}
return nil
}
func (w *S3File) Seek(offset int64, whence int) (int64, error) {
// 如果还没有创建临时文件,先创建或下载
if w.tempFile == nil {
if err := w.downloadToTemp(); err != nil {
return 0, err
}
}
// 使用临时文件进行随机访问
return w.tempFile.Seek(offset, whence)
}
func (w *S3File) Close() error {
if err := w.Sync(); err != nil {
return err
}
if w.tempFile != nil {
w.tempFile.Close()
}
// 清理临时文件
if w.filePath != "" {
os.Remove(w.filePath)
}
return nil
}
// createTempFile 创建临时文件
func (w *S3File) createTempFile() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "s3writer_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
w.tempFile = tempFile
w.filePath = tempFile.Name()
return nil
}
func (w *S3File) Stat() (os.FileInfo, error) {
return w.tempFile.Stat()
}
// uploadTempFile 上传临时文件到S3
func (w *S3File) uploadTempFile() (err error) {
// 上传到S3
_, err = w.storage.uploader.UploadWithContext(w.ctx, &s3manager.UploadInput{
Bucket: aws.String(w.storage.config.Bucket),
Key: aws.String(w.objectKey),
Body: w.tempFile,
ContentType: aws.String("application/octet-stream"),
})
if err != nil {
return fmt.Errorf("failed to upload to S3: %w", err)
}
return nil
}
// downloadToTemp 下载S3对象到本地临时文件
func (w *S3File) downloadToTemp() error {
// 创建临时文件
tempFile, err := os.CreateTemp("", "s3reader_*.tmp")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
w.tempFile = tempFile
w.filePath = tempFile.Name()
// 下载S3对象
_, err = w.storage.downloader.DownloadWithContext(w.ctx, tempFile, &s3.GetObjectInput{
Bucket: aws.String(w.storage.config.Bucket),
Key: aws.String(w.objectKey),
})
if err != nil {
tempFile.Close()
os.Remove(w.filePath)
if strings.Contains(err.Error(), "NotFound") || strings.Contains(err.Error(), "NoSuchKey") {
return ErrFileNotFound
}
return fmt.Errorf("failed to download from S3: %w", err)
}
// 重置文件指针到开始位置
_, err = tempFile.Seek(0, 0)
if err != nil {
tempFile.Close()
os.Remove(w.filePath)
return fmt.Errorf("failed to seek temp file: %w", err)
}
return nil
}
func init() {
Factory["s3"] = func(conf any) (Storage, error) {
var s3Config S3StorageConfig
config.Parse(&s3Config, conf.(map[string]any))
return NewS3Storage(&s3Config)
}
}
+100
View File
@@ -0,0 +1,100 @@
package storage
import (
"context"
"fmt"
"io"
"os"
"time"
)
// StorageType 存储类型
type StorageType string
const (
StorageTypeLocal StorageType = "local"
StorageTypeS3 StorageType = "s3"
StorageTypeOSS StorageType = "oss"
StorageTypeCOS StorageType = "cos"
)
// StorageConfig 存储配置接口
type StorageConfig interface {
GetType() StorageType
Validate() error
}
// Storage 存储接口
type Storage interface {
CreateFile(ctx context.Context, path string) (File, error)
// Delete 删除文件
Delete(ctx context.Context, path string) error
// Exists 检查文件是否存在
Exists(ctx context.Context, path string) (bool, error)
// GetSize 获取文件大小
GetSize(ctx context.Context, path string) (int64, error)
// GetURL 获取文件访问URL
GetURL(ctx context.Context, path string) (string, error)
// List 列出文件
List(ctx context.Context, prefix string) ([]FileInfo, error)
// Close 关闭存储连接
Close() error
}
// Writer 写入器接口
type Writer interface {
io.Writer
io.WriterAt
io.Closer
// Sync 同步到存储
Sync() error
// Seek 设置写入位置
Seek(offset int64, whence int) (int64, error)
}
// Reader 读取器接口
type Reader interface {
io.Reader
io.ReaderAt
io.Closer
io.Seeker
}
type File interface {
Writer
Reader
Stat() (os.FileInfo, error)
Name() string
}
// FileInfo 文件信息
type FileInfo struct {
Name string `json:"name"`
Size int64 `json:"size"`
LastModified time.Time `json:"last_modified"`
ETag string `json:"etag,omitempty"`
ContentType string `json:"content_type,omitempty"`
}
// CreateStorage 创建存储实例的便捷函数
func CreateStorage(t string, config any) (Storage, error) {
factory, exists := Factory[t]
if !exists {
return nil, ErrUnsupportedStorageType
}
return factory(config)
}
// 错误定义
var (
ErrUnsupportedStorageType = fmt.Errorf("unsupported storage type")
ErrFileNotFound = fmt.Errorf("file not found")
ErrStorageNotAvailable = fmt.Errorf("storage not available")
)
+24 -7
View File
@@ -1,6 +1,7 @@
package flv
import (
"context"
"fmt"
"io"
"os"
@@ -10,6 +11,7 @@ import (
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/storage"
"m7s.live/v5/pkg/task"
rtmp "m7s.live/v5/plugin/rtmp/pkg"
)
@@ -26,7 +28,7 @@ func init() {
type writeMetaTagTask struct {
task.Task
file *os.File
file storage.File
writer *FlvWriter
flags byte
metaData []byte
@@ -76,7 +78,7 @@ func (task *writeMetaTagTask) Start() (err error) {
}
}
func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64, times []float64, duration *int64) {
func writeMetaTag(file storage.File, suber *m7s.Subscriber, filepositions []uint64, times []float64, duration *int64) {
ar, vr := suber.AudioReader, suber.VideoReader
hasAudio, hasVideo := ar != nil, vr != nil
var amf rtmp.AMF
@@ -142,7 +144,7 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
writer *FlvWriter
file *os.File
file storage.File
}
var CustomFileName = func(job *m7s.RecordJob) string {
@@ -158,11 +160,26 @@ func (r *Recorder) createStream(start time.Time) (err error) {
if err != nil {
return
}
if r.file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
// 获取存储实例
storage := r.RecordJob.GetStorage()
if storage != nil {
// 使用存储抽象层
r.file, err = storage.CreateFile(context.Background(), r.Event.FilePath)
if err != nil {
return
}
r.writer = NewFlvWriter(r.file)
} else {
// 默认本地文件行为
if r.file, err = os.OpenFile(r.Event.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
return
}
r.writer = NewFlvWriter(r.file)
}
_, err = r.file.Write(FLVHead)
r.writer = NewFlvWriter(r.file)
_, err = r.writer.Write(FLVHead)
if err != nil {
return
}
+23 -21
View File
@@ -1,6 +1,7 @@
package hls
import (
"context"
"fmt"
"path/filepath"
"time"
@@ -10,6 +11,7 @@ import (
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/format"
mpegts "m7s.live/v5/pkg/format/ts"
"m7s.live/v5/pkg/storage"
)
func NewRecorder(conf config.Record) m7s.IRecorder {
@@ -18,7 +20,8 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
ts TsInFile
TsInMemory
file storage.File
segmentCount uint32
lastTs time.Duration
firstSegment bool
@@ -38,12 +41,15 @@ func (r *Recorder) createStream(start time.Time) (err error) {
func (r *Recorder) writeTailer(end time.Time) {
if !r.RecordJob.RecConf.RealTime {
if r.ts.file != nil {
r.ts.WriteTo(r.ts.file)
r.ts.Recycle()
defer r.TsInMemory.Recycle()
var err error
r.file, err = r.RecordJob.GetStorage().CreateFile(context.Background(), r.Event.FilePath)
if err != nil {
return
}
r.WriteTo(r.file)
}
r.ts.Close()
r.file.Close()
r.WriteTail(end, nil)
}
@@ -53,11 +59,7 @@ func (r *Recorder) Dispose() {
func (r *Recorder) createNewTs() (err error) {
if r.RecordJob.RecConf.RealTime {
if err = r.ts.Open(r.Event.FilePath); err != nil {
r.Error("create ts file failed", "err", err, "path", r.Event.FilePath)
}
} else {
r.ts.path = r.Event.FilePath
r.file, err = r.RecordJob.GetStorage().CreateFile(context.Background(), r.Event.FilePath)
}
return
}
@@ -89,8 +91,8 @@ func (r *Recorder) writeSegment(ts time.Duration, writeTime time.Time) (err erro
return
}
if r.RecordJob.RecConf.RealTime {
r.ts.file.Write(mpegts.DefaultPATPacket)
r.ts.file.Write(r.ts.PMT)
r.file.Write(mpegts.DefaultPATPacket)
r.file.Write(r.PMT)
}
r.segmentCount++
r.lastTs = ts
@@ -121,18 +123,18 @@ func (r *Recorder) Run() (err error) {
if suber.Publisher.HasVideoTrack() {
videoCodec = suber.Publisher.VideoTrack.FourCC()
}
r.ts.WritePMTPacket(audioCodec, videoCodec)
r.WritePMTPacket(audioCodec, videoCodec)
if ctx.RecConf.RealTime {
r.ts.file.Write(mpegts.DefaultPATPacket)
r.ts.file.Write(r.ts.PMT)
r.file.Write(mpegts.DefaultPATPacket)
r.file.Write(r.PMT)
}
return m7s.PlayBlock(suber, func(audio *format.Mpeg2Audio) (err error) {
pesAudio.Pts = uint64(suber.AudioReader.AbsTime) * 90
err = pesAudio.WritePESPacket(audio.Memory, &r.ts.RecyclableMemory)
err = pesAudio.WritePESPacket(audio.Memory, &r.RecyclableMemory)
if err == nil {
if ctx.RecConf.RealTime {
r.ts.RecyclableMemory.WriteTo(r.ts.file)
r.ts.RecyclableMemory.Recycle()
r.RecyclableMemory.WriteTo(r.file)
r.RecyclableMemory.Recycle()
}
}
return
@@ -146,11 +148,11 @@ func (r *Recorder) Run() (err error) {
pesVideo.IsKeyFrame = video.IDR
pesVideo.Pts = uint64(vr.AbsTime+video.GetCTS32()) * 90
pesVideo.Dts = uint64(vr.AbsTime) * 90
err = pesVideo.WritePESPacket(video.Memory, &r.ts.RecyclableMemory)
err = pesVideo.WritePESPacket(video.Memory, &r.RecyclableMemory)
if err == nil {
if ctx.RecConf.RealTime {
r.ts.RecyclableMemory.WriteTo(r.ts.file)
r.ts.RecyclableMemory.Recycle()
r.RecyclableMemory.WriteTo(r.file)
r.RecyclableMemory.Recycle()
}
}
return
+2 -2
View File
@@ -3,8 +3,8 @@ package mp4
import (
"encoding/binary"
"io"
"os"
"m7s.live/v5/pkg/storage"
. "m7s.live/v5/plugin/mp4/pkg/box"
)
@@ -252,7 +252,7 @@ func (m *Muxer) WriteMoov(w io.Writer) (err error) {
return
}
func (m *Muxer) WriteTrailer(file *os.File) (err error) {
func (m *Muxer) WriteTrailer(file storage.Writer) (err error) {
if m.isFragment() {
// Flush any remaining samples
// if err = m.flushFragment(file); err != nil {
+28 -13
View File
@@ -1,6 +1,7 @@
package mp4
import (
"context"
"fmt"
"io"
"os"
@@ -11,9 +12,9 @@ import (
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/storage"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/mp4/pkg/box"
s3plugin "m7s.live/v5/plugin/s3"
)
type WriteTrailerQueueTask struct {
@@ -25,7 +26,7 @@ var writeTrailerQueueTask WriteTrailerQueueTask
type writeTrailerTask struct {
task.Task
muxer *Muxer
file *os.File
file storage.File
filePath string
}
@@ -104,13 +105,7 @@ func (t *writeTrailerTask) Run() (err error) {
if err = temp.Close(); err != nil {
t.Error("close temp file", "err", err)
}
// MP4文件处理完成后,触发S3上传
if t.filePath != "" {
t.Info("MP4 file processing completed, triggering S3 upload", "filePath", t.filePath)
s3plugin.TriggerUpload(t.filePath, false) // 不删除本地文件,让用户配置决定
}
return
}
@@ -125,7 +120,7 @@ func NewRecorder(conf config.Record) m7s.IRecorder {
type Recorder struct {
m7s.DefaultRecorder
muxer *Muxer
file *os.File
file storage.File
}
func (r *Recorder) writeTailer(end time.Time) {
@@ -152,15 +147,30 @@ func (r *Recorder) createStream(start time.Time) (err error) {
if err != nil {
return
}
r.file, err = os.Create(r.Event.FilePath)
if err != nil {
return
// 获取存储实例
storage := r.RecordJob.GetStorage()
if storage != nil {
// 使用存储抽象层
r.file, err = storage.CreateFile(context.Background(), r.Event.FilePath)
if err != nil {
return
}
} else {
// 默认本地文件行为
r.file, err = os.Create(r.Event.FilePath)
if err != nil {
return
}
}
if r.Event.Type == "fmp4" {
r.muxer = NewMuxerWithStreamPath(FLAG_FRAGMENT, r.Event.StreamPath)
} else {
r.muxer = NewMuxerWithStreamPath(0, r.Event.StreamPath)
}
return r.muxer.WriteInitSegment(r.file)
}
@@ -168,6 +178,11 @@ func (r *Recorder) Dispose() {
if r.muxer != nil {
r.writeTailer(time.Now())
}
// 关闭存储写入器
if r.file != nil {
r.file.Close()
}
}
func (r *Recorder) Run() (err error) {
-196
View File
@@ -1,196 +0,0 @@
package plugin_s3
import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"google.golang.org/protobuf/types/known/emptypb"
gpb "m7s.live/v5/pb"
"m7s.live/v5/plugin/s3/pb"
)
// Upload implements the gRPC Upload method
func (p *S3Plugin) Upload(ctx context.Context, req *pb.UploadRequest) (*pb.UploadResponse, error) {
if req.Filename == "" {
return nil, fmt.Errorf("filename is required")
}
if len(req.Content) == 0 {
return nil, fmt.Errorf("content is required")
}
bucket := req.Bucket
if bucket == "" {
bucket = p.Bucket
}
// Generate S3 key
key := req.Filename
if !strings.HasPrefix(key, "/") {
key = "/" + key
}
// Determine content type
contentType := req.ContentType
if contentType == "" {
contentType = http.DetectContentType(req.Content)
}
// Upload to S3
input := &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(req.Content),
ContentLength: aws.Int64(int64(len(req.Content))),
ContentType: aws.String(contentType),
}
result, err := p.s3Client.PutObjectWithContext(ctx, input)
if err != nil {
p.Error("Failed to upload file to S3", "error", err, "key", key, "bucket", bucket)
return nil, fmt.Errorf("failed to upload file: %v", err)
}
// Generate public URL
url := fmt.Sprintf("%s/%s%s", p.getEndpointURL(), bucket, key)
p.Info("File uploaded successfully", "key", key, "bucket", bucket, "size", len(req.Content))
return &pb.UploadResponse{
Code: 0,
Message: "Upload successful",
Data: &pb.UploadData{
Key: key,
Url: url,
Size: int64(len(req.Content)),
Etag: aws.StringValue(result.ETag),
},
}, nil
}
// List implements the gRPC List method
func (p *S3Plugin) List(ctx context.Context, req *pb.ListRequest) (*pb.ListResponse, error) {
bucket := req.Bucket
if bucket == "" {
bucket = p.Bucket
}
input := &s3.ListObjectsInput{
Bucket: aws.String(bucket),
}
if req.Prefix != "" {
input.Prefix = aws.String(req.Prefix)
}
if req.MaxKeys > 0 {
input.MaxKeys = aws.Int64(int64(req.MaxKeys))
}
if req.Marker != "" {
input.Marker = aws.String(req.Marker)
}
result, err := p.s3Client.ListObjectsWithContext(ctx, input)
if err != nil {
p.Error("Failed to list objects from S3", "error", err, "bucket", bucket)
return nil, fmt.Errorf("failed to list objects: %v", err)
}
var objects []*pb.S3Object
for _, obj := range result.Contents {
objects = append(objects, &pb.S3Object{
Key: aws.StringValue(obj.Key),
Size: aws.Int64Value(obj.Size),
LastModified: obj.LastModified.Format(time.RFC3339),
Etag: aws.StringValue(obj.ETag),
StorageClass: aws.StringValue(obj.StorageClass),
})
}
var nextMarker string
if result.NextMarker != nil {
nextMarker = aws.StringValue(result.NextMarker)
}
p.Info("Listed objects successfully", "bucket", bucket, "count", len(objects))
return &pb.ListResponse{
Code: 0,
Message: "List successful",
Data: &pb.ListData{
Objects: objects,
IsTruncated: aws.BoolValue(result.IsTruncated),
NextMarker: nextMarker,
},
}, nil
}
// Delete implements the gRPC Delete method
func (p *S3Plugin) Delete(ctx context.Context, req *pb.DeleteRequest) (*gpb.SuccessResponse, error) {
if req.Key == "" {
return nil, fmt.Errorf("key is required")
}
bucket := req.Bucket
if bucket == "" {
bucket = p.Bucket
}
input := &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(req.Key),
}
_, err := p.s3Client.DeleteObjectWithContext(ctx, input)
if err != nil {
p.Error("Failed to delete object from S3", "error", err, "key", req.Key, "bucket", bucket)
return nil, fmt.Errorf("failed to delete object: %v", err)
}
p.Info("Object deleted successfully", "key", req.Key, "bucket", bucket)
return &gpb.SuccessResponse{
Code: 0,
Message: "Delete successful",
}, nil
}
// CheckConnection implements the gRPC CheckConnection method
func (p *S3Plugin) CheckConnection(ctx context.Context, req *emptypb.Empty) (*pb.ConnectionResponse, error) {
// Test connection by listing buckets
_, err := p.s3Client.ListBucketsWithContext(ctx, &s3.ListBucketsInput{})
connected := err == nil
message := "Connection successful"
if err != nil {
message = fmt.Sprintf("Connection failed: %v", err)
p.Error("S3 connection check failed", "error", err)
} else {
p.Info("S3 connection check successful")
}
return &pb.ConnectionResponse{
Code: 0,
Message: message,
Data: &pb.ConnectionData{
Connected: connected,
Endpoint: p.Endpoint,
Region: p.Region,
UseSsl: p.UseSSL,
Bucket: p.Bucket,
},
}, nil
}
// Helper method to get endpoint URL
func (p *S3Plugin) getEndpointURL() string {
protocol := "http"
if p.UseSSL {
protocol = "https"
}
return fmt.Sprintf("%s://%s", protocol, p.Endpoint)
}
-236
View File
@@ -1,236 +0,0 @@
package plugin_s3
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"m7s.live/v5"
"m7s.live/v5/pkg/task"
"m7s.live/v5/plugin/s3/pb"
)
// 上传任务队列工作器
type UploadQueueTask struct {
task.Work
}
var uploadQueueTask UploadQueueTask
// 文件上传任务
type FileUploadTask struct {
task.Task
plugin *S3Plugin
filePath string
objectKey string
deleteAfterUpload bool
}
type S3Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Endpoint string `desc:"S3 service endpoint, such as MinIO address"`
Region string `default:"us-east-1" desc:"AWS region"`
AccessKeyID string `desc:"S3 access key ID"`
SecretAccessKey string `desc:"S3 secret access key"`
Bucket string `desc:"S3 bucket name"`
PathPrefix string `desc:"file path prefix"`
ForcePathStyle bool `desc:"force path style (required for MinIO)"`
UseSSL bool `default:"true" desc:"whether to use SSL"`
Auto bool `desc:"whether to automatically upload recorded files"`
DeleteAfterUpload bool `desc:"whether to delete local file after successful upload"`
Timeout int `default:"30" desc:"upload timeout in seconds"`
s3Client *s3.S3
}
var _ = m7s.InstallPlugin[S3Plugin](m7s.PluginMeta{
ServiceDesc: &pb.Api_ServiceDesc,
RegisterGRPCHandler: pb.RegisterApiHandler,
})
// 全局S3插件实例
var s3PluginInstance *S3Plugin
func init() {
// 将上传队列任务添加到服务器
m7s.Servers.AddTask(&uploadQueueTask)
}
func (p *S3Plugin) Start() error {
// 设置全局实例
s3PluginInstance = p
// Set default configuration
if p.Region == "" {
p.Region = "us-east-1"
}
if p.Timeout == 0 {
p.Timeout = 30
}
// Create AWS session configuration
config := &aws.Config{
Region: aws.String(p.Region),
Credentials: credentials.NewStaticCredentials(p.AccessKeyID, p.SecretAccessKey, ""),
S3ForcePathStyle: aws.Bool(p.ForcePathStyle),
}
// Set endpoint if provided (for MinIO or other S3-compatible services)
if p.Endpoint != "" {
protocol := "http"
if p.UseSSL {
protocol = "https"
}
endpoint := p.Endpoint
if !strings.HasPrefix(endpoint, "http") {
endpoint = protocol + "://" + endpoint
}
config.Endpoint = aws.String(endpoint)
config.DisableSSL = aws.Bool(!p.UseSSL)
}
// Create AWS session
sess, err := session.NewSession(config)
if err != nil {
return fmt.Errorf("failed to create AWS session: %v", err)
}
// Create S3 client
p.s3Client = s3.New(sess)
// Test connection
if err := p.testConnection(); err != nil {
return fmt.Errorf("S3 connection test failed: %v", err)
}
p.Info("S3 plugin initialized successfully")
return nil
}
// testConnection tests the S3 connection
func (p *S3Plugin) testConnection() error {
// Try to list buckets to test connection
_, err := p.s3Client.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
return err
}
p.Info("S3 connection test successful")
return nil
}
// uploadFile uploads a file to S3
func (p *S3Plugin) uploadFile(filePath, objectKey string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return err
}
// Add path prefix if configured
if p.PathPrefix != "" {
objectKey = strings.TrimSuffix(p.PathPrefix, "/") + "/" + objectKey
}
// Upload file to S3
input := &s3.PutObjectInput{
Bucket: aws.String(p.Bucket),
Key: aws.String(objectKey),
Body: file,
ContentLength: aws.Int64(fileInfo.Size()),
ContentType: aws.String("application/octet-stream"),
}
_, err = p.s3Client.PutObject(input)
if err != nil {
return err
}
p.Info("File uploaded successfully", "objectKey", objectKey, "size", fileInfo.Size())
return nil
}
// FileUploadTask的Start方法
func (task *FileUploadTask) Start() error {
task.Info("Starting file upload", "filePath", task.filePath, "objectKey", task.objectKey)
return nil
}
// FileUploadTask的Run方法
func (task *FileUploadTask) Run() error {
// 检查文件是否存在
if _, err := os.Stat(task.filePath); os.IsNotExist(err) {
return fmt.Errorf("file does not exist: %s", task.filePath)
}
// 执行上传
err := task.plugin.uploadFile(task.filePath, task.objectKey)
if err != nil {
task.Error("Failed to upload file", "error", err, "filePath", task.filePath)
return err
}
// 如果配置了上传后删除,则删除本地文件
if task.deleteAfterUpload {
if err := os.Remove(task.filePath); err != nil {
task.Warn("Failed to delete local file after upload", "error", err, "filePath", task.filePath)
} else {
task.Info("Local file deleted after successful upload", "filePath", task.filePath)
}
}
task.Info("File upload completed successfully", "filePath", task.filePath, "objectKey", task.objectKey)
return nil
}
// 队列上传文件方法
func (p *S3Plugin) QueueUpload(filePath, objectKey string, deleteAfter bool) {
if !p.Auto {
p.Debug("Auto upload is disabled, skipping upload", "filePath", filePath)
return
}
uploadTask := &FileUploadTask{
plugin: p,
filePath: filePath,
objectKey: objectKey,
deleteAfterUpload: deleteAfter || p.DeleteAfterUpload,
}
// 将上传任务添加到队列
uploadQueueTask.AddTask(uploadTask, p.Logger.With("filePath", filePath, "objectKey", objectKey))
p.Info("File upload queued", "filePath", filePath, "objectKey", objectKey)
}
// 生成S3对象键的辅助方法
func (p *S3Plugin) generateObjectKey(filePath string) string {
// 获取文件名
fileName := filepath.Base(filePath)
// 如果配置了路径前缀,则添加前缀
if p.PathPrefix != "" {
return strings.TrimSuffix(p.PathPrefix, "/") + "/" + fileName
}
return fileName
}
// TriggerUpload 全局函数,供其他插件调用以触发S3上传
func TriggerUpload(filePath string, deleteAfter bool) {
if s3PluginInstance == nil {
return // S3插件未启用或未初始化
}
objectKey := s3PluginInstance.generateObjectKey(filePath)
s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter)
}
-803
View File
@@ -1,803 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: s3.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
pb "m7s.live/v5/pb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type UploadRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Filename string `protobuf:"bytes,1,opt,name=filename,proto3" json:"filename,omitempty"` // File name
Content []byte `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` // File content
ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` // MIME type
Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"` // Bucket name (optional, uses default if empty)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UploadRequest) Reset() {
*x = UploadRequest{}
mi := &file_s3_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UploadRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UploadRequest) ProtoMessage() {}
func (x *UploadRequest) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UploadRequest.ProtoReflect.Descriptor instead.
func (*UploadRequest) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{0}
}
func (x *UploadRequest) GetFilename() string {
if x != nil {
return x.Filename
}
return ""
}
func (x *UploadRequest) GetContent() []byte {
if x != nil {
return x.Content
}
return nil
}
func (x *UploadRequest) GetContentType() string {
if x != nil {
return x.ContentType
}
return ""
}
func (x *UploadRequest) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
type UploadResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data *UploadData `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UploadResponse) Reset() {
*x = UploadResponse{}
mi := &file_s3_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UploadResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UploadResponse) ProtoMessage() {}
func (x *UploadResponse) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UploadResponse.ProtoReflect.Descriptor instead.
func (*UploadResponse) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{1}
}
func (x *UploadResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *UploadResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *UploadResponse) GetData() *UploadData {
if x != nil {
return x.Data
}
return nil
}
type UploadData struct {
state protoimpl.MessageState `protogen:"open.v1"`
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // S3 object key
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` // Public URL
Size int64 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"` // File size in bytes
Etag string `protobuf:"bytes,4,opt,name=etag,proto3" json:"etag,omitempty"` // ETag from S3
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *UploadData) Reset() {
*x = UploadData{}
mi := &file_s3_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *UploadData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*UploadData) ProtoMessage() {}
func (x *UploadData) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use UploadData.ProtoReflect.Descriptor instead.
func (*UploadData) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{2}
}
func (x *UploadData) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *UploadData) GetUrl() string {
if x != nil {
return x.Url
}
return ""
}
func (x *UploadData) GetSize() int64 {
if x != nil {
return x.Size
}
return 0
}
func (x *UploadData) GetEtag() string {
if x != nil {
return x.Etag
}
return ""
}
type ListRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Prefix string `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` // Prefix filter
MaxKeys int32 `protobuf:"varint,2,opt,name=max_keys,json=maxKeys,proto3" json:"max_keys,omitempty"` // Maximum number of keys to return
Marker string `protobuf:"bytes,3,opt,name=marker,proto3" json:"marker,omitempty"` // Pagination marker
Bucket string `protobuf:"bytes,4,opt,name=bucket,proto3" json:"bucket,omitempty"` // Bucket name (optional, uses default if empty)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListRequest) Reset() {
*x = ListRequest{}
mi := &file_s3_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListRequest) ProtoMessage() {}
func (x *ListRequest) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListRequest.ProtoReflect.Descriptor instead.
func (*ListRequest) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{3}
}
func (x *ListRequest) GetPrefix() string {
if x != nil {
return x.Prefix
}
return ""
}
func (x *ListRequest) GetMaxKeys() int32 {
if x != nil {
return x.MaxKeys
}
return 0
}
func (x *ListRequest) GetMarker() string {
if x != nil {
return x.Marker
}
return ""
}
func (x *ListRequest) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
type ListResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data *ListData `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListResponse) Reset() {
*x = ListResponse{}
mi := &file_s3_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListResponse) ProtoMessage() {}
func (x *ListResponse) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListResponse.ProtoReflect.Descriptor instead.
func (*ListResponse) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{4}
}
func (x *ListResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ListResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ListResponse) GetData() *ListData {
if x != nil {
return x.Data
}
return nil
}
type ListData struct {
state protoimpl.MessageState `protogen:"open.v1"`
Objects []*S3Object `protobuf:"bytes,1,rep,name=objects,proto3" json:"objects,omitempty"`
IsTruncated bool `protobuf:"varint,2,opt,name=is_truncated,json=isTruncated,proto3" json:"is_truncated,omitempty"` // Whether there are more results
NextMarker string `protobuf:"bytes,3,opt,name=next_marker,json=nextMarker,proto3" json:"next_marker,omitempty"` // Next pagination marker
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ListData) Reset() {
*x = ListData{}
mi := &file_s3_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ListData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ListData) ProtoMessage() {}
func (x *ListData) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ListData.ProtoReflect.Descriptor instead.
func (*ListData) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{5}
}
func (x *ListData) GetObjects() []*S3Object {
if x != nil {
return x.Objects
}
return nil
}
func (x *ListData) GetIsTruncated() bool {
if x != nil {
return x.IsTruncated
}
return false
}
func (x *ListData) GetNextMarker() string {
if x != nil {
return x.NextMarker
}
return ""
}
type S3Object struct {
state protoimpl.MessageState `protogen:"open.v1"`
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // Object key
Size int64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"` // Object size in bytes
LastModified string `protobuf:"bytes,3,opt,name=last_modified,json=lastModified,proto3" json:"last_modified,omitempty"` // Last modified timestamp
Etag string `protobuf:"bytes,4,opt,name=etag,proto3" json:"etag,omitempty"` // ETag
StorageClass string `protobuf:"bytes,5,opt,name=storage_class,json=storageClass,proto3" json:"storage_class,omitempty"` // Storage class
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *S3Object) Reset() {
*x = S3Object{}
mi := &file_s3_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *S3Object) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*S3Object) ProtoMessage() {}
func (x *S3Object) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use S3Object.ProtoReflect.Descriptor instead.
func (*S3Object) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{6}
}
func (x *S3Object) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *S3Object) GetSize() int64 {
if x != nil {
return x.Size
}
return 0
}
func (x *S3Object) GetLastModified() string {
if x != nil {
return x.LastModified
}
return ""
}
func (x *S3Object) GetEtag() string {
if x != nil {
return x.Etag
}
return ""
}
func (x *S3Object) GetStorageClass() string {
if x != nil {
return x.StorageClass
}
return ""
}
type DeleteRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // Object key to delete
Bucket string `protobuf:"bytes,2,opt,name=bucket,proto3" json:"bucket,omitempty"` // Bucket name (optional, uses default if empty)
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DeleteRequest) Reset() {
*x = DeleteRequest{}
mi := &file_s3_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DeleteRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DeleteRequest) ProtoMessage() {}
func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead.
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{7}
}
func (x *DeleteRequest) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *DeleteRequest) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
type ConnectionResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Data *ConnectionData `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ConnectionResponse) Reset() {
*x = ConnectionResponse{}
mi := &file_s3_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ConnectionResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConnectionResponse) ProtoMessage() {}
func (x *ConnectionResponse) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConnectionResponse.ProtoReflect.Descriptor instead.
func (*ConnectionResponse) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{8}
}
func (x *ConnectionResponse) GetCode() uint32 {
if x != nil {
return x.Code
}
return 0
}
func (x *ConnectionResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
func (x *ConnectionResponse) GetData() *ConnectionData {
if x != nil {
return x.Data
}
return nil
}
type ConnectionData struct {
state protoimpl.MessageState `protogen:"open.v1"`
Connected bool `protobuf:"varint,1,opt,name=connected,proto3" json:"connected,omitempty"` // Whether connection is successful
Endpoint string `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` // S3 endpoint
Region string `protobuf:"bytes,3,opt,name=region,proto3" json:"region,omitempty"` // AWS region
UseSsl bool `protobuf:"varint,4,opt,name=use_ssl,json=useSsl,proto3" json:"use_ssl,omitempty"` // Whether SSL is enabled
Bucket string `protobuf:"bytes,5,opt,name=bucket,proto3" json:"bucket,omitempty"` // Default bucket name
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ConnectionData) Reset() {
*x = ConnectionData{}
mi := &file_s3_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ConnectionData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ConnectionData) ProtoMessage() {}
func (x *ConnectionData) ProtoReflect() protoreflect.Message {
mi := &file_s3_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ConnectionData.ProtoReflect.Descriptor instead.
func (*ConnectionData) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{9}
}
func (x *ConnectionData) GetConnected() bool {
if x != nil {
return x.Connected
}
return false
}
func (x *ConnectionData) GetEndpoint() string {
if x != nil {
return x.Endpoint
}
return ""
}
func (x *ConnectionData) GetRegion() string {
if x != nil {
return x.Region
}
return ""
}
func (x *ConnectionData) GetUseSsl() bool {
if x != nil {
return x.UseSsl
}
return false
}
func (x *ConnectionData) GetBucket() string {
if x != nil {
return x.Bucket
}
return ""
}
var File_s3_proto protoreflect.FileDescriptor
const file_s3_proto_rawDesc = "" +
"\n" +
"\bs3.proto\x12\x02s3\x1a\x1cgoogle/api/annotations.proto\x1a\x1bgoogle/protobuf/empty.proto\x1a\fglobal.proto\"\x80\x01\n" +
"\rUploadRequest\x12\x1a\n" +
"\bfilename\x18\x01 \x01(\tR\bfilename\x12\x18\n" +
"\acontent\x18\x02 \x01(\fR\acontent\x12!\n" +
"\fcontent_type\x18\x03 \x01(\tR\vcontentType\x12\x16\n" +
"\x06bucket\x18\x04 \x01(\tR\x06bucket\"b\n" +
"\x0eUploadResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12\"\n" +
"\x04data\x18\x03 \x01(\v2\x0e.s3.UploadDataR\x04data\"X\n" +
"\n" +
"UploadData\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x10\n" +
"\x03url\x18\x02 \x01(\tR\x03url\x12\x12\n" +
"\x04size\x18\x03 \x01(\x03R\x04size\x12\x12\n" +
"\x04etag\x18\x04 \x01(\tR\x04etag\"p\n" +
"\vListRequest\x12\x16\n" +
"\x06prefix\x18\x01 \x01(\tR\x06prefix\x12\x19\n" +
"\bmax_keys\x18\x02 \x01(\x05R\amaxKeys\x12\x16\n" +
"\x06marker\x18\x03 \x01(\tR\x06marker\x12\x16\n" +
"\x06bucket\x18\x04 \x01(\tR\x06bucket\"^\n" +
"\fListResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12 \n" +
"\x04data\x18\x03 \x01(\v2\f.s3.ListDataR\x04data\"v\n" +
"\bListData\x12&\n" +
"\aobjects\x18\x01 \x03(\v2\f.s3.S3ObjectR\aobjects\x12!\n" +
"\fis_truncated\x18\x02 \x01(\bR\visTruncated\x12\x1f\n" +
"\vnext_marker\x18\x03 \x01(\tR\n" +
"nextMarker\"\x8e\x01\n" +
"\bS3Object\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x12\n" +
"\x04size\x18\x02 \x01(\x03R\x04size\x12#\n" +
"\rlast_modified\x18\x03 \x01(\tR\flastModified\x12\x12\n" +
"\x04etag\x18\x04 \x01(\tR\x04etag\x12#\n" +
"\rstorage_class\x18\x05 \x01(\tR\fstorageClass\"9\n" +
"\rDeleteRequest\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x16\n" +
"\x06bucket\x18\x02 \x01(\tR\x06bucket\"j\n" +
"\x12ConnectionResponse\x12\x12\n" +
"\x04code\x18\x01 \x01(\rR\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12&\n" +
"\x04data\x18\x03 \x01(\v2\x12.s3.ConnectionDataR\x04data\"\x93\x01\n" +
"\x0eConnectionData\x12\x1c\n" +
"\tconnected\x18\x01 \x01(\bR\tconnected\x12\x1a\n" +
"\bendpoint\x18\x02 \x01(\tR\bendpoint\x12\x16\n" +
"\x06region\x18\x03 \x01(\tR\x06region\x12\x17\n" +
"\ause_ssl\x18\x04 \x01(\bR\x06useSsl\x12\x16\n" +
"\x06bucket\x18\x05 \x01(\tR\x06bucket2\xc8\x02\n" +
"\x03api\x12J\n" +
"\x06Upload\x12\x11.s3.UploadRequest\x1a\x12.s3.UploadResponse\"\x19\x82\xd3\xe4\x93\x02\x13:\x01*\"\x0e/s3/api/upload\x12?\n" +
"\x04List\x12\x0f.s3.ListRequest\x1a\x10.s3.ListResponse\"\x14\x82\xd3\xe4\x93\x02\x0e\x12\f/s3/api/list\x12U\n" +
"\x06Delete\x12\x11.s3.DeleteRequest\x1a\x17.global.SuccessResponse\"\x1f\x82\xd3\xe4\x93\x02\x19*\x17/s3/api/delete/{key=**}\x12]\n" +
"\x0fCheckConnection\x12\x16.google.protobuf.Empty\x1a\x16.s3.ConnectionResponse\"\x1a\x82\xd3\xe4\x93\x02\x14\x12\x12/s3/api/connectionB\x1aZ\x18m7s.live/v5/plugin/s3/pbb\x06proto3"
var (
file_s3_proto_rawDescOnce sync.Once
file_s3_proto_rawDescData []byte
)
func file_s3_proto_rawDescGZIP() []byte {
file_s3_proto_rawDescOnce.Do(func() {
file_s3_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_s3_proto_rawDesc), len(file_s3_proto_rawDesc)))
})
return file_s3_proto_rawDescData
}
var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_s3_proto_goTypes = []any{
(*UploadRequest)(nil), // 0: s3.UploadRequest
(*UploadResponse)(nil), // 1: s3.UploadResponse
(*UploadData)(nil), // 2: s3.UploadData
(*ListRequest)(nil), // 3: s3.ListRequest
(*ListResponse)(nil), // 4: s3.ListResponse
(*ListData)(nil), // 5: s3.ListData
(*S3Object)(nil), // 6: s3.S3Object
(*DeleteRequest)(nil), // 7: s3.DeleteRequest
(*ConnectionResponse)(nil), // 8: s3.ConnectionResponse
(*ConnectionData)(nil), // 9: s3.ConnectionData
(*emptypb.Empty)(nil), // 10: google.protobuf.Empty
(*pb.SuccessResponse)(nil), // 11: global.SuccessResponse
}
var file_s3_proto_depIdxs = []int32{
2, // 0: s3.UploadResponse.data:type_name -> s3.UploadData
5, // 1: s3.ListResponse.data:type_name -> s3.ListData
6, // 2: s3.ListData.objects:type_name -> s3.S3Object
9, // 3: s3.ConnectionResponse.data:type_name -> s3.ConnectionData
0, // 4: s3.api.Upload:input_type -> s3.UploadRequest
3, // 5: s3.api.List:input_type -> s3.ListRequest
7, // 6: s3.api.Delete:input_type -> s3.DeleteRequest
10, // 7: s3.api.CheckConnection:input_type -> google.protobuf.Empty
1, // 8: s3.api.Upload:output_type -> s3.UploadResponse
4, // 9: s3.api.List:output_type -> s3.ListResponse
11, // 10: s3.api.Delete:output_type -> global.SuccessResponse
8, // 11: s3.api.CheckConnection:output_type -> s3.ConnectionResponse
8, // [8:12] is the sub-list for method output_type
4, // [4:8] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_s3_proto_init() }
func file_s3_proto_init() {
if File_s3_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_s3_proto_rawDesc), len(file_s3_proto_rawDesc)),
NumEnums: 0,
NumMessages: 10,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_s3_proto_goTypes,
DependencyIndexes: file_s3_proto_depIdxs,
MessageInfos: file_s3_proto_msgTypes,
}.Build()
File_s3_proto = out.File
file_s3_proto_goTypes = nil
file_s3_proto_depIdxs = nil
}
-441
View File
@@ -1,441 +0,0 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: s3.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Api_Upload_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq UploadRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.Upload(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_Upload_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq UploadRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.Upload(ctx, &protoReq)
return msg, metadata, err
}
var (
filter_Api_List_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
)
func request_Api_List_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ListRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_List_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.List(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_List_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq ListRequest
var metadata runtime.ServerMetadata
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_List_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.List(ctx, &protoReq)
return msg, metadata, err
}
var (
filter_Api_Delete_0 = &utilities.DoubleArray{Encoding: map[string]int{"key": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}}
)
func request_Api_Delete_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeleteRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["key"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "key")
}
protoReq.Key, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "key", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_Delete_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.Delete(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_Delete_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq DeleteRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["key"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "key")
}
protoReq.Key, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "key", err)
}
if err := req.ParseForm(); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_Api_Delete_0); err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := server.Delete(ctx, &protoReq)
return msg, metadata, err
}
func request_Api_CheckConnection_0(ctx context.Context, marshaler runtime.Marshaler, client ApiClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := client.CheckConnection(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Api_CheckConnection_0(ctx context.Context, marshaler runtime.Marshaler, server ApiServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq emptypb.Empty
var metadata runtime.ServerMetadata
msg, err := server.CheckConnection(ctx, &protoReq)
return msg, metadata, err
}
// RegisterApiHandlerServer registers the http handlers for service Api to "mux".
// UnaryRPC :call ApiServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterApiHandlerFromEndpoint instead.
func RegisterApiHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ApiServer) error {
mux.Handle("POST", pattern_Api_Upload_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/s3.Api/Upload", runtime.WithHTTPPathPattern("/s3/api/upload"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_Upload_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Upload_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/s3.Api/List", runtime.WithHTTPPathPattern("/s3/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_List_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("DELETE", pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/s3.Api/Delete", runtime.WithHTTPPathPattern("/s3/api/delete/{key=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_Delete_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Delete_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_CheckConnection_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/s3.Api/CheckConnection", runtime.WithHTTPPathPattern("/s3/api/connection"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Api_CheckConnection_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_CheckConnection_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterApiHandlerFromEndpoint is same as RegisterApiHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterApiHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterApiHandler(ctx, mux, conn)
}
// RegisterApiHandler registers the http handlers for service Api to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterApiHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterApiHandlerClient(ctx, mux, NewApiClient(conn))
}
// RegisterApiHandlerClient registers the http handlers for service Api
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ApiClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ApiClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "ApiClient" to call the correct interceptors.
func RegisterApiHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ApiClient) error {
mux.Handle("POST", pattern_Api_Upload_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/s3.Api/Upload", runtime.WithHTTPPathPattern("/s3/api/upload"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_Upload_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Upload_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/s3.Api/List", runtime.WithHTTPPathPattern("/s3/api/list"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_List_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_List_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("DELETE", pattern_Api_Delete_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/s3.Api/Delete", runtime.WithHTTPPathPattern("/s3/api/delete/{key=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_Delete_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_Delete_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("GET", pattern_Api_CheckConnection_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/s3.Api/CheckConnection", runtime.WithHTTPPathPattern("/s3/api/connection"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Api_CheckConnection_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Api_CheckConnection_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Api_Upload_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"s3", "api", "upload"}, ""))
pattern_Api_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"s3", "api", "list"}, ""))
pattern_Api_Delete_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"s3", "api", "delete", "key"}, ""))
pattern_Api_CheckConnection_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"s3", "api", "connection"}, ""))
)
var (
forward_Api_Upload_0 = runtime.ForwardResponseMessage
forward_Api_List_0 = runtime.ForwardResponseMessage
forward_Api_Delete_0 = runtime.ForwardResponseMessage
forward_Api_CheckConnection_0 = runtime.ForwardResponseMessage
)
-96
View File
@@ -1,96 +0,0 @@
syntax = "proto3";
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "global.proto";
package s3;
option go_package="m7s.live/v5/plugin/s3/pb";
service api {
rpc Upload (UploadRequest) returns (UploadResponse) {
option (google.api.http) = {
post: "/s3/api/upload"
body: "*"
};
}
rpc List (ListRequest) returns (ListResponse) {
option (google.api.http) = {
get: "/s3/api/list"
};
}
rpc Delete (DeleteRequest) returns (global.SuccessResponse) {
option (google.api.http) = {
delete: "/s3/api/delete/{key=**}"
};
}
rpc CheckConnection (google.protobuf.Empty) returns (ConnectionResponse) {
option (google.api.http) = {
get: "/s3/api/connection"
};
}
}
message UploadRequest {
string filename = 1; // File name
bytes content = 2; // File content
string content_type = 3; // MIME type
string bucket = 4; // Bucket name (optional, uses default if empty)
}
message UploadResponse {
uint32 code = 1;
string message = 2;
UploadData data = 3;
}
message UploadData {
string key = 1; // S3 object key
string url = 2; // Public URL
int64 size = 3; // File size in bytes
string etag = 4; // ETag from S3
}
message ListRequest {
string prefix = 1; // Prefix filter
int32 max_keys = 2; // Maximum number of keys to return
string marker = 3; // Pagination marker
string bucket = 4; // Bucket name (optional, uses default if empty)
}
message ListResponse {
uint32 code = 1;
string message = 2;
ListData data = 3;
}
message ListData {
repeated S3Object objects = 1;
bool is_truncated = 2; // Whether there are more results
string next_marker = 3; // Next pagination marker
}
message S3Object {
string key = 1; // Object key
int64 size = 2; // Object size in bytes
string last_modified = 3; // Last modified timestamp
string etag = 4; // ETag
string storage_class = 5; // Storage class
}
message DeleteRequest {
string key = 1; // Object key to delete
string bucket = 2; // Bucket name (optional, uses default if empty)
}
message ConnectionResponse {
uint32 code = 1;
string message = 2;
ConnectionData data = 3;
}
message ConnectionData {
bool connected = 1; // Whether connection is successful
string endpoint = 2; // S3 endpoint
string region = 3; // AWS region
bool use_ssl = 4; // Whether SSL is enabled
string bucket = 5; // Default bucket name
}
-237
View File
@@ -1,237 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: s3.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
pb "m7s.live/v5/pb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
Api_Upload_FullMethodName = "/s3.api/Upload"
Api_List_FullMethodName = "/s3.api/List"
Api_Delete_FullMethodName = "/s3.api/Delete"
Api_CheckConnection_FullMethodName = "/s3.api/CheckConnection"
)
// ApiClient is the client API for Api service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ApiClient interface {
Upload(ctx context.Context, in *UploadRequest, opts ...grpc.CallOption) (*UploadResponse, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error)
CheckConnection(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ConnectionResponse, error)
}
type apiClient struct {
cc grpc.ClientConnInterface
}
func NewApiClient(cc grpc.ClientConnInterface) ApiClient {
return &apiClient{cc}
}
func (c *apiClient) Upload(ctx context.Context, in *UploadRequest, opts ...grpc.CallOption) (*UploadResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UploadResponse)
err := c.cc.Invoke(ctx, Api_Upload_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListResponse)
err := c.cc.Invoke(ctx, Api_List_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*pb.SuccessResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(pb.SuccessResponse)
err := c.cc.Invoke(ctx, Api_Delete_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *apiClient) CheckConnection(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ConnectionResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ConnectionResponse)
err := c.cc.Invoke(ctx, Api_CheckConnection_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ApiServer is the server API for Api service.
// All implementations must embed UnimplementedApiServer
// for forward compatibility.
type ApiServer interface {
Upload(context.Context, *UploadRequest) (*UploadResponse, error)
List(context.Context, *ListRequest) (*ListResponse, error)
Delete(context.Context, *DeleteRequest) (*pb.SuccessResponse, error)
CheckConnection(context.Context, *emptypb.Empty) (*ConnectionResponse, error)
mustEmbedUnimplementedApiServer()
}
// UnimplementedApiServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedApiServer struct{}
func (UnimplementedApiServer) Upload(context.Context, *UploadRequest) (*UploadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Upload not implemented")
}
func (UnimplementedApiServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedApiServer) Delete(context.Context, *DeleteRequest) (*pb.SuccessResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedApiServer) CheckConnection(context.Context, *emptypb.Empty) (*ConnectionResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CheckConnection not implemented")
}
func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
func (UnimplementedApiServer) testEmbeddedByValue() {}
// UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ApiServer will
// result in compilation errors.
type UnsafeApiServer interface {
mustEmbedUnimplementedApiServer()
}
func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
// If the following call pancis, it indicates UnimplementedApiServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Api_ServiceDesc, srv)
}
func _Api_Upload_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UploadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).Upload(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_Upload_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).Upload(ctx, req.(*UploadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_Delete_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Api_CheckConnection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(emptypb.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ApiServer).CheckConnection(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Api_CheckConnection_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ApiServer).CheckConnection(ctx, req.(*emptypb.Empty))
}
return interceptor(ctx, in, info, handler)
}
// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Api_ServiceDesc = grpc.ServiceDesc{
ServiceName: "s3.api",
HandlerType: (*ApiServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Upload",
Handler: _Api_Upload_Handler,
},
{
MethodName: "List",
Handler: _Api_List_Handler,
},
{
MethodName: "Delete",
Handler: _Api_Delete_Handler,
},
{
MethodName: "CheckConnection",
Handler: _Api_CheckConnection_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "s3.proto",
}
+1 -3
View File
@@ -250,9 +250,7 @@ func (t *Transformer) Start() (err error) {
case SnapConfig:
snapConfig = v
case map[string]any:
var conf config.Config
conf.Parse(&snapConfig)
conf.ParseModifyFile(v)
config.Parse(&snapConfig, v)
}
}
+3 -2
View File
@@ -3,6 +3,7 @@ package transcode
import (
"bufio"
"fmt"
"maps"
"net"
"net/url"
"os"
@@ -71,7 +72,7 @@ func (t *Transformer) Start() (err error) {
case DecodeConfig:
t.From = v
case map[string]any:
config.Parse(&t.TransRule.From, v)
config.Parse(&t.TransRule.From, maps.Clone(v))
case string:
t.From.Mode = TRANS_MODE_PIPE
t.From.Args = v
@@ -116,7 +117,7 @@ func (t *Transformer) Start() (err error) {
if to.Conf != nil {
switch v := to.Conf.(type) {
case map[string]any:
config.Parse(&enc, v)
config.Parse(&enc, maps.Clone(v))
case string:
enc.Args = v
}
+37 -7
View File
@@ -1,13 +1,13 @@
package m7s
import (
"os"
"path/filepath"
"fmt"
"time"
"gorm.io/gorm"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/storage"
"m7s.live/v5/pkg/task"
)
@@ -33,6 +33,7 @@ type (
SubConf *config.Subscribe
RecConf *config.Record
recorder IRecorder
storage storage.Storage // 存储实例
}
DefaultRecorder struct {
task.Task
@@ -67,16 +68,23 @@ func (r *DefaultRecorder) Start() (err error) {
func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*RecordJob) string) (err error) {
recordJob := &r.RecordJob
sub := recordJob.Subscriber
// 生成文件路径
filePath := customFileName(recordJob)
recordJob.storage = r.createStorage(recordJob.RecConf.Storage)
if recordJob.storage == nil {
return fmt.Errorf("storage config is required")
}
r.Event.RecordStream = RecordStream{
StartTime: start,
StreamPath: sub.StreamPath,
FilePath: customFileName(recordJob),
FilePath: filePath,
Type: recordJob.RecConf.Type,
}
dir := filepath.Dir(r.Event.FilePath)
if err = os.MkdirAll(dir, 0755); err != nil {
return
}
if sub.Publisher.HasAudioTrack() {
r.Event.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
}
@@ -96,6 +104,23 @@ func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*Rec
return
}
// createStorage 创建存储实例
func (r *DefaultRecorder) createStorage(storageConfig map[string]any) storage.Storage {
for t, conf := range storageConfig {
storage, err := storage.CreateStorage(t, conf)
if err == nil {
return storage
}
}
localStorage, err := storage.CreateStorage("local", r.RecordJob.RecConf.FilePath)
if err == nil {
return localStorage
} else {
r.Error("create storage failed", "err", err)
}
return nil
}
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {
r.Event.EndTime = end
if r.RecordJob.Plugin.DB != nil && r.RecordJob.RecConf.Mode != config.RecordModeTest {
@@ -117,6 +142,11 @@ func (p *RecordJob) GetKey() string {
return p.RecConf.FilePath
}
// GetStorage 获取存储实例
func (p *RecordJob) GetStorage() storage.Storage {
return p.storage
}
func (p *RecordJob) Subscribe() (err error) {
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf)