From 23119314329c0b1b441b711a4fc421d0ea67fd32 Mon Sep 17 00:00:00 2001 From: eanfs Date: Mon, 8 Sep 2025 08:53:15 +0800 Subject: [PATCH] feature:mp4-upload-s3 (#325) * iFLOW CLI Automated Issue Triage * Update for ai * Revert "Update for ai" This reverts commit b85978298a76af9c9a05a18261b18813fae4243c. * Update ai md * feature:mp4-upload-s3 --- .github/workflows/iflow.yml | 101 +++++++++++++++++++++ CLAUDE.md | 176 +++++++++++++++++++++++++++++++++++- GEMINI.md | 92 +++++++++++++++++++ IFLOW.md | 124 +++++++++++++++++++++++++ example/default/config.yaml | 11 +++ plugin/mp4/pkg/record.go | 18 +++- plugin/s3/index.go | 127 +++++++++++++++++++++++--- 7 files changed, 631 insertions(+), 18 deletions(-) create mode 100644 .github/workflows/iflow.yml create mode 100644 GEMINI.md create mode 100644 IFLOW.md diff --git a/.github/workflows/iflow.yml b/.github/workflows/iflow.yml new file mode 100644 index 0000000..e9c0567 --- /dev/null +++ b/.github/workflows/iflow.yml @@ -0,0 +1,101 @@ +name: '🏷️ iFLOW CLI Automated Issue Triage' + +on: + issues: + types: + - 'opened' + - 'reopened' + issue_comment: + types: + - 'created' + workflow_dispatch: + inputs: + issue_number: + description: 'issue number to triage' + required: true + type: 'number' + +concurrency: + group: '${{ github.workflow }}-${{ github.event.issue.number }}' + cancel-in-progress: true + +defaults: + run: + shell: 'bash' + +permissions: + contents: 'read' + issues: 'write' + statuses: 'write' + +jobs: + triage-issue: + if: |- + github.event_name == 'issues' || + github.event_name == 'workflow_dispatch' || + ( + github.event_name == 'issue_comment' && + contains(github.event.comment.body, '@iflow-cli /triage') && + contains(fromJSON('["OWNER", "MEMBER", "COLLABORATOR"]'), github.event.comment.author_association) + ) + timeout-minutes: 5 + runs-on: 'ubuntu-latest' + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: 'Run iFlow CLI Issue Triage' + uses: vibe-ideas/iflow-cli-action@main + id: 'iflow_cli_issue_triage' + env: + GITHUB_TOKEN: '${{ secrets.GITHUB_TOKEN }}' + ISSUE_TITLE: '${{ github.event.issue.title }}' + ISSUE_BODY: '${{ github.event.issue.body }}' + ISSUE_NUMBER: '${{ github.event.issue.number }}' + REPOSITORY: '${{ github.repository }}' + with: + api_key: ${{ secrets.IFLOW_API_KEY }} + timeout: "3600" + extra_args: "--debug" + prompt: | + ## Role + + You are an issue triage assistant. Analyze the current GitHub issue + and apply the most appropriate existing labels. Use the available + tools to gather information; do not ask for information to be + provided. + + ## Steps + + 1. Run: `gh label list` to get all available labels. + 2. Review the issue title and body provided in the environment + variables: "${ISSUE_TITLE}" and "${ISSUE_BODY}". + 3. Classify issues by their kind (bug, enhancement, documentation, + cleanup, etc) and their priority (p0, p1, p2, p3). Set the + labels according to the format `kind/*` and `priority/*` patterns. + 4. Apply the selected labels to this issue using: + `gh issue edit "${ISSUE_NUMBER}" --add-label "label1,label2"` + 5. If the "status/needs-triage" label is present, remove it using: + `gh issue edit "${ISSUE_NUMBER}" --remove-label "status/needs-triage"` + + ## Guidelines + + - Only use labels that already exist in the repository + - Do not add comments or modify the issue content + - Triage only the current issue + - Assign all applicable labels based on the issue content + - Reference all shell variables as "${VAR}" (with quotes and braces) + + - name: 'Post Issue Triage Failure Comment' + if: |- + ${{ failure() && steps.iflow_cli_issue_triage.outcome == 'failure' }} + uses: 'actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea' + with: + github-token: '${{ secrets.GITHUB_TOKEN }}' + script: |- + github.rest.issues.createComment({ + owner: '${{ github.repository }}'.split('/')[0], + repo: '${{ github.repository }}'.split('/')[1], + issue_number: '${{ github.event.issue.number }}', + body: 'There is a problem with the iFlow CLI issue triaging. Please check the [action logs](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}) for details.' + }) diff --git a/CLAUDE.md b/CLAUDE.md index f35f95f..2bc83a9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -61,7 +61,70 @@ go test ./... **Configuration System (`pkg/config/`):** Hierarchical configuration system with priority order: dynamic modifications > environment variables > config files > default YAML > global config > defaults. -**Task System (`pkg/task/`):** Asynchronous task management with dependency handling, lifecycle management, and graceful shutdown capabilities. +**Task System (`pkg/task/`):** Advanced asynchronous task management system with multiple layers: +- **Task:** Basic unit of work with lifecycle management (Start/Run/Dispose) +- **Job:** Container that manages multiple child tasks and provides event loops +- **Work:** Special type of Job that acts as a persistent queue manager (keepalive=true) +- **Channel:** Event-driven task for handling continuous data streams + +### Task System Deep Dive + +#### Task Hierarchy and Lifecycle +``` +Work (Queue Manager) + └── Job (Container with Event Loop) + └── Task (Basic Work Unit) + ├── Start() - Initialization phase + ├── Run() - Main execution phase + └── Dispose() - Cleanup phase +``` + +#### Queue-based Asynchronous Processing +The Task system supports sophisticated queue-based processing patterns: + +1. **Work as Queue Manager:** Work instances stay alive indefinitely and manage queues of tasks +2. **Task Queuing:** Use `workInstance.AddTask(task, logger)` to queue tasks +3. **Automatic Lifecycle:** Tasks are automatically started, executed, and disposed +4. **Error Handling:** Built-in retry mechanisms and error propagation + +**Example Pattern (from S3 plugin):** +```go +type UploadQueueTask struct { + task.Work // Persistent queue manager +} + +type FileUploadTask struct { + task.Task // Individual work item + // ... task-specific fields +} + +// Initialize queue manager (typically in init()) +var uploadQueueTask UploadQueueTask +m7s.Servers.AddTask(&uploadQueueTask) + +// Queue individual tasks +uploadQueueTask.AddTask(&FileUploadTask{...}, logger) +``` + +#### Cross-Plugin Task Cooperation +Tasks can coordinate across different plugins through: + +1. **Global Instance Pattern:** Plugins expose global instances for cross-plugin access +2. **Event-based Triggers:** One plugin triggers tasks in another plugin +3. **Shared Queue Managers:** Multiple plugins can use the same Work instance + +**Example (MP4 → S3 Integration):** +```go +// In MP4 plugin: trigger S3 upload after recording completes +s3plugin.TriggerUpload(filePath, deleteAfter) + +// S3 plugin receives trigger and queues upload task +func TriggerUpload(filePath string, deleteAfter bool) { + if s3PluginInstance != nil { + s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter) + } +} +``` ### Key Interfaces @@ -80,6 +143,18 @@ go test ./... 4. **Transformers** can process streams between publishers and subscribers 5. **Plugins** provide protocol-specific implementations +### Post-Recording Workflow + +Monibuca implements a sophisticated post-recording processing pipeline: + +1. **Recording Completion:** MP4 recorder finishes writing stream data +2. **Trailer Writing:** Asynchronous task moves MOOV box to file beginning for web compatibility +3. **File Optimization:** Temporary file operations ensure atomic updates +4. **External Storage Integration:** Automatic upload to S3-compatible services +5. **Cleanup:** Optional local file deletion after successful upload + +This workflow uses queue-based task processing to avoid blocking the main recording pipeline. + ## Plugin Development ### Creating a Plugin @@ -100,6 +175,81 @@ go test ./... 3. **Run:** Active operation 4. **Dispose:** Cleanup and shutdown +### Cross-Plugin Communication Patterns + +#### 1. Global Instance Pattern +```go +// Expose global instance for cross-plugin access +var s3PluginInstance *S3Plugin + +func (p *S3Plugin) Start() error { + s3PluginInstance = p // Set global instance + // ... rest of start logic +} + +// Provide public API functions +func TriggerUpload(filePath string, deleteAfter bool) { + if s3PluginInstance != nil { + s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter) + } +} +``` + +#### 2. Event-Driven Integration +```go +// In one plugin: trigger event after completion +if t.filePath != "" { + t.Info("MP4 file processing completed, triggering S3 upload") + s3plugin.TriggerUpload(t.filePath, false) +} +``` + +#### 3. Shared Queue Managers +Multiple plugins can share Work instances for coordinated processing. + +### Asynchronous Task Development Best Practices + +#### 1. Implement Task Interfaces +```go +type MyTask struct { + task.Task + // ... custom fields +} + +func (t *MyTask) Start() error { + // Initialize resources, validate inputs + return nil +} + +func (t *MyTask) Run() error { + // Main work execution + // Return task.ErrTaskComplete for successful completion + return nil +} +``` + +#### 2. Use Work for Queue Management +```go +type MyQueueManager struct { + task.Work +} + +var myQueue MyQueueManager + +func init() { + m7s.Servers.AddTask(&myQueue) +} + +// Queue tasks from anywhere +myQueue.AddTask(&MyTask{...}, logger) +``` + +#### 3. Error Handling and Retry +- Tasks automatically support retry mechanisms +- Use `task.SetRetry(maxRetry, interval)` for custom retry behavior +- Return `task.ErrTaskComplete` for successful completion +- Return other errors to trigger retry or failure handling + ## Configuration Structure ### Global Configuration @@ -131,8 +281,9 @@ Automatic migration is handled for core models including users, proxies, and str - **WebRTC:** Web real-time communication - **GB28181:** Chinese surveillance standard - **FLV:** Flash video format -- **MP4:** MPEG-4 format +- **MP4:** MPEG-4 format with post-processing capabilities - **SRT:** Secure reliable transport +- **S3:** File upload integration with AWS S3/MinIO compatibility ## Authentication & Security @@ -154,11 +305,18 @@ Automatic migration is handled for core models including users, proxies, and str - Integration tests can use the example configurations - Use the mock.py script for protocol testing +### Async Task Development +- Always use Work instances for queue management +- Implement proper Start/Run lifecycle in tasks +- Use global instance pattern for cross-plugin communication +- Handle errors gracefully with appropriate retry strategies + ### Performance Considerations - Memory pool is enabled by default (disable with `disable_rm`) - Zero-copy design for media data where possible - Lock-free data structures for high concurrency - Efficient buffer management with ring buffers +- Queue-based processing prevents blocking main threads ## Debugging @@ -173,6 +331,12 @@ Automatic migration is handled for core models including users, proxies, and str - Log rotation support - Fatal crash logging +### Task System Debugging +- Tasks automatically include detailed logging with task IDs and types +- Use `task.Debug/Info/Warn/Error` methods for consistent logging +- Task state and progress can be monitored through descriptions +- Event loop status and queue lengths are logged automatically + ## Web Admin Interface - Web-based admin UI served from `admin.zip` @@ -196,4 +360,10 @@ Automatic migration is handled for core models including users, proxies, and str ### Plugin Loading - Plugins are auto-discovered from imports - Check plugin enable/disable status -- Verify configuration merging \ No newline at end of file +- Verify configuration merging + +### Task System Issues +- Ensure Work instances are added to server during initialization +- Check task queue status if tasks aren't executing +- Verify proper error handling in task implementation +- Monitor task retry counts and failure reasons in logs \ No newline at end of file diff --git a/GEMINI.md b/GEMINI.md new file mode 100644 index 0000000..76e5b38 --- /dev/null +++ b/GEMINI.md @@ -0,0 +1,92 @@ +# Gemini Context: Monibuca Project + +This document provides a summary of the Monibuca project to give context for AI-assisted development. + +## Project Overview + +Monibuca is a modular, high-performance streaming media server framework written in Go. Its core design is lightweight and plugin-based, allowing developers to extend functionality by adding or developing plugins for different streaming protocols and features. The project's module path is `m7s.live/v4`. + +The architecture is centered around a core engine (`m7s.live/v4`) that manages plugins, streams, and the main event loop. Functionality is added by importing plugins, which register themselves with the core engine. + +**Key Technologies:** +- **Language:** Go +- **Architecture:** Plugin-based +- **APIs:** RESTful HTTP API, gRPC API + +**Supported Protocols (based on plugins):** +- RTMP +- RTSP +- HLS +- FLV +- WebRTC +- GB28181 +- SRT +- And more... + +## Building and Running + +### Build +To build the server, run the following command from the project root: +```bash +go build -v . +``` + +### Test +To run the test suite: +```bash +go test -v ./... +``` + +### Running the Server +The server is typically run by creating a `main.go` file that imports the core engine and the desired plugins. + +**Example `main.go`:** +```go +package main + +import ( + "m7s.live/v4" + // Import desired plugins to register them + _ "m7s.live/plugin/rtmp/v4" + _ "m7s.live/plugin/rtsp/v4" + _ "m7s.live/plugin/hls/v4" + _ "m7s.live/plugin/webrtc/v4" +) + +func main() { + m7s.Run() +} +``` +The server is executed by running `go run main.go`. Configuration is managed through a `config.yaml` file in the same directory. + +### Docker +The project includes a `Dockerfile` to build and run in a container. +```bash +# Build the image +docker build -t monibuca . + +# Run the container +docker run -p 8080:8080 monibuca +``` + +## Development Conventions + +### Project Structure +- `server.go`: Core engine logic. +- `plugin/`: Contains individual plugins for different protocols and features. +- `pkg/`: Shared packages and utilities used across the project. +- `pb/`: Protobuf definitions for the gRPC API. +- `example/`: Example implementations and configurations. +- `doc/`: Project documentation. + +### Plugin System +The primary way to add functionality is by creating or enabling plugins. A plugin is a Go package that registers itself with the core engine upon import (using the `init()` function). This modular approach keeps the core small and allows for custom builds with only the necessary features. + +### API +- **RESTful API:** Defined in `api.go`, provides HTTP endpoints for controlling and monitoring the server. +- **gRPC API:** Defined in the `pb/` directory using protobuf. `protoc.sh` is used to generate the Go code from the `.proto` files. + +### Code Style and CI +- The project uses `golangci-lint` for linting, as seen in the `.github/workflows/go.yml` file. +- Static analysis is configured via `staticcheck.conf` and `qodana.yaml`. +- All code should be formatted with `gofmt`. diff --git a/IFLOW.md b/IFLOW.md new file mode 100644 index 0000000..2427df3 --- /dev/null +++ b/IFLOW.md @@ -0,0 +1,124 @@ +# Monibuca v5 项目概述 + +Monibuca 是一个使用纯 Go 语言开发的、高度可扩展的高性能流媒体服务器开发框架。它旨在提供高并发、低延迟的流媒体处理能力,并支持多种流媒体协议和功能。 + +## 核心特性 + +* **高性能**: 采用无锁设计、部分手动内存管理和多核计算。 +* **低延迟**: 实现零等待转发,全链路亚秒级延迟。 +* **模块化**: 按需加载,无限扩展性。 +* **灵活性**: 高度可配置,适应各种流媒体场景。 +* **可扩展性**: 支持分布式部署,轻松应对大规模场景。 +* **调试友好**: 内置调试插件,实时性能监控与分析。 +* **媒体处理**: 支持截图、转码、SEI 数据处理。 +* **集群能力**: 内置级联和房间管理。 +* **预览功能**: 支持视频预览、多屏预览、自定义屏幕布局。 +* **安全性**: 提供加密传输和流认证。 +* **性能监控**: 支持压力测试和性能指标收集(集成在测试插件中)。 +* **日志管理**: 日志轮转、自动清理、自定义扩展。 +* **录制与回放**: 支持 MP4、HLS、FLV 格式,支持倍速、寻址、暂停。 +* **动态时移**: 动态缓存设计,支持直播时移回放。 +* **远程调用**: 支持 gRPC 接口,实现跨语言集成。 +* **流别名**: 支持动态流别名,灵活的多流管理。 +* **AI 能力**: 集成推理引擎,支持 ONNX 模型,支持自定义前后处理。 +* **WebHook**: 订阅流生命周期事件,用于业务系统集成。 +* **私有协议**: 支持自定义私有协议以满足特殊业务需求。 + +## 支持的协议 + +* RTMP +* RTSP +* HTTP-FLV +* WS-FLV +* HLS +* WebRTC +* GB28181 +* ONVIF +* SRT + +## 技术架构 + +Monibuca 基于插件化架构设计,核心功能通过插件扩展。主要组件包括: + +* **Server**: 核心服务器,负责管理流、插件、任务等。 +* **Plugin**: 插件系统,提供各种功能扩展。 +* **Publisher**: 流发布者,负责接收和管理流数据。 +* **Subscriber**: 流订阅者,负责消费流数据。 +* **Task**: 任务系统,用于管理异步任务和生命周期。 +* **Config**: 配置系统,支持多层级配置(环境变量、配置文件、默认值等)。 + +## 构建与运行 + +### 前提条件 + +* Go 1.23 或更高版本 +* 对流媒体协议有基本了解 + +### 运行默认配置 + +```bash +cd example/default +go run -tags sqlite main.go +``` + +### 构建标签 + +可以使用以下构建标签来自定义构建: + +| 构建标签 | 描述 | +| :--- | :--- | +| `disable_rm` | 禁用内存池 | +| `sqlite` | 启用 sqlite DB | +| `sqliteCGO` | 启用 sqlite cgo 版本 DB | +| `mysql` | 启用 mysql DB | +| `postgres` | 启用 postgres DB | +| `duckdb` | 启用 duckdb DB | +| `taskpanic` | 抛出 panic,用于测试 | +| `fasthttp` | 启用 fasthttp 服务器而不是 net/http | + +### Web UI + +将 `admin.zip` 文件(不要解压)放在与配置文件相同的目录中。然后访问 http://localhost:8080 即可访问 UI。 + +## 开发约定 + +### 项目结构 + +* `example/`: 包含各种使用示例。 +* `pkg/`: 核心库代码。 +* `plugin/`: 各种功能插件。 +* `pb/`: Protocol Buffer 生成的代码。 +* `doc/`: 项目文档。 +* `scripts/`: 脚本文件。 + +### 配置 + +* 使用 YAML 格式进行配置。 +* 支持多层级配置覆盖(环境变量 > 配置文件 > 默认值)。 +* 插件配置通常以插件名小写作为前缀。 + +### 日志 + +* 使用 `slog` 进行日志记录。 +* 支持不同日志级别(debug, info, warn, error, trace)。 +* 插件可以有自己的日志记录器。 + +### 插件开发 + +* 插件需要实现 `IPlugin` 接口。 +* 通过 `InstallPlugin` 函数注册插件。 +* 插件可以注册 HTTP 处理函数、gRPC 服务等。 +* 插件可以有自己的配置结构体。 + +### 任务系统 + +* 使用 `task` 包管理异步任务。 +* 任务具有生命周期管理(启动、停止、销毁)。 +* 任务可以有父子关系,形成任务树。 +* 支持任务重试机制。 + +### 测试 + +* 使用 Go 标准测试包 `testing`。 +* 在 `test/` 目录下编写集成测试。 +* 使用 `example/test` 目录进行功能测试。 \ No newline at end of file diff --git a/example/default/config.yaml b/example/default/config.yaml index 97e8d81..5d8af48 100755 --- a/example/default/config.yaml +++ b/example/default/config.yaml @@ -94,6 +94,17 @@ hls: # live/test: rtsp://admin:1qaz2wsx3EDC@giroro.tpddns.cn:1554/Streaming/Channels/101 # live/test: rtsp://admin:1qaz2wsx3EDC@localhost:8554/live/test +s3: + auto: true # 启用自动上传 + deleteAfterUpload: false # 上传后保留本地文件 + endpoint: "storage-dev.xiding.tech" + accessKeyId: "xidinguser" + secretAccessKey: "U2FsdGVkX1/7uyvj0trCzSNFsfDZ66dMSAEZjNlvW1c=" + bucket: "vidu-media-bucket" + pathPrefix: "recordings" + forcePathStyle: true + useSSL: true + snap: enable: false onpub: diff --git a/plugin/mp4/pkg/record.go b/plugin/mp4/pkg/record.go index 641aa2c..e2fccc5 100644 --- a/plugin/mp4/pkg/record.go +++ b/plugin/mp4/pkg/record.go @@ -13,6 +13,7 @@ import ( "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/mp4/pkg/box" + s3plugin "m7s.live/v5/plugin/s3" ) type WriteTrailerQueueTask struct { @@ -23,8 +24,9 @@ var writeTrailerQueueTask WriteTrailerQueueTask type writeTrailerTask struct { task.Task - muxer *Muxer - file *os.File + muxer *Muxer + file *os.File + filePath string } func (task *writeTrailerTask) Start() (err error) { @@ -102,6 +104,13 @@ func (t *writeTrailerTask) Run() (err error) { if err = temp.Close(); err != nil { t.Error("close temp file", "err", err) } + + // MP4文件处理完成后,触发S3上传 + if t.filePath != "" { + t.Info("MP4 file processing completed, triggering S3 upload", "filePath", t.filePath) + s3plugin.TriggerUpload(t.filePath, false) // 不删除本地文件,让用户配置决定 + } + return } @@ -122,8 +131,9 @@ type Recorder struct { func (r *Recorder) writeTailer(end time.Time) { r.WriteTail(end, &writeTrailerQueueTask) writeTrailerQueueTask.AddTask(&writeTrailerTask{ - muxer: r.muxer, - file: r.file, + muxer: r.muxer, + file: r.file, + filePath: r.Event.FilePath, }, r.Logger) } diff --git a/plugin/s3/index.go b/plugin/s3/index.go index 08cab31..3bc6055 100644 --- a/plugin/s3/index.go +++ b/plugin/s3/index.go @@ -3,6 +3,7 @@ package plugin_s3 import ( "fmt" "os" + "path/filepath" "strings" "github.com/aws/aws-sdk-go/aws" @@ -10,23 +11,41 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "m7s.live/v5" + "m7s.live/v5/pkg/task" "m7s.live/v5/plugin/s3/pb" ) +// 上传任务队列工作器 +type UploadQueueTask struct { + task.Work +} + +var uploadQueueTask UploadQueueTask + +// 文件上传任务 +type FileUploadTask struct { + task.Task + plugin *S3Plugin + filePath string + objectKey string + deleteAfterUpload bool +} + type S3Plugin struct { pb.UnimplementedApiServer m7s.Plugin - Endpoint string `desc:"S3 service endpoint, such as MinIO address"` - Region string `default:"us-east-1" desc:"AWS region"` - AccessKeyID string `desc:"S3 access key ID"` - SecretAccessKey string `desc:"S3 secret access key"` - Bucket string `desc:"S3 bucket name"` - PathPrefix string `desc:"file path prefix"` - ForcePathStyle bool `desc:"force path style (required for MinIO)"` - UseSSL bool `default:"true" desc:"whether to use SSL"` - Auto bool `desc:"whether to automatically upload recorded files"` - Timeout int `default:"30" desc:"upload timeout in seconds"` - s3Client *s3.S3 + Endpoint string `desc:"S3 service endpoint, such as MinIO address"` + Region string `default:"us-east-1" desc:"AWS region"` + AccessKeyID string `desc:"S3 access key ID"` + SecretAccessKey string `desc:"S3 secret access key"` + Bucket string `desc:"S3 bucket name"` + PathPrefix string `desc:"file path prefix"` + ForcePathStyle bool `desc:"force path style (required for MinIO)"` + UseSSL bool `default:"true" desc:"whether to use SSL"` + Auto bool `desc:"whether to automatically upload recorded files"` + DeleteAfterUpload bool `desc:"whether to delete local file after successful upload"` + Timeout int `default:"30" desc:"upload timeout in seconds"` + s3Client *s3.S3 } var _ = m7s.InstallPlugin[S3Plugin](m7s.PluginMeta{ @@ -34,7 +53,18 @@ var _ = m7s.InstallPlugin[S3Plugin](m7s.PluginMeta{ RegisterGRPCHandler: pb.RegisterApiHandler, }) +// 全局S3插件实例 +var s3PluginInstance *S3Plugin + +func init() { + // 将上传队列任务添加到服务器 + m7s.Servers.AddTask(&uploadQueueTask) +} + func (p *S3Plugin) Start() error { + // 设置全局实例 + s3PluginInstance = p + // Set default configuration if p.Region == "" { p.Region = "us-east-1" @@ -129,3 +159,78 @@ func (p *S3Plugin) uploadFile(filePath, objectKey string) error { p.Info("File uploaded successfully", "objectKey", objectKey, "size", fileInfo.Size()) return nil } + +// FileUploadTask的Start方法 +func (task *FileUploadTask) Start() error { + task.Info("Starting file upload", "filePath", task.filePath, "objectKey", task.objectKey) + return nil +} + +// FileUploadTask的Run方法 +func (task *FileUploadTask) Run() error { + // 检查文件是否存在 + if _, err := os.Stat(task.filePath); os.IsNotExist(err) { + return fmt.Errorf("file does not exist: %s", task.filePath) + } + + // 执行上传 + err := task.plugin.uploadFile(task.filePath, task.objectKey) + if err != nil { + task.Error("Failed to upload file", "error", err, "filePath", task.filePath) + return err + } + + // 如果配置了上传后删除,则删除本地文件 + if task.deleteAfterUpload { + if err := os.Remove(task.filePath); err != nil { + task.Warn("Failed to delete local file after upload", "error", err, "filePath", task.filePath) + } else { + task.Info("Local file deleted after successful upload", "filePath", task.filePath) + } + } + + task.Info("File upload completed successfully", "filePath", task.filePath, "objectKey", task.objectKey) + return nil +} + +// 队列上传文件方法 +func (p *S3Plugin) QueueUpload(filePath, objectKey string, deleteAfter bool) { + if !p.Auto { + p.Debug("Auto upload is disabled, skipping upload", "filePath", filePath) + return + } + + uploadTask := &FileUploadTask{ + plugin: p, + filePath: filePath, + objectKey: objectKey, + deleteAfterUpload: deleteAfter || p.DeleteAfterUpload, + } + + // 将上传任务添加到队列 + uploadQueueTask.AddTask(uploadTask, p.Logger.With("filePath", filePath, "objectKey", objectKey)) + p.Info("File upload queued", "filePath", filePath, "objectKey", objectKey) +} + +// 生成S3对象键的辅助方法 +func (p *S3Plugin) generateObjectKey(filePath string) string { + // 获取文件名 + fileName := filepath.Base(filePath) + + // 如果配置了路径前缀,则添加前缀 + if p.PathPrefix != "" { + return strings.TrimSuffix(p.PathPrefix, "/") + "/" + fileName + } + + return fileName +} + +// TriggerUpload 全局函数,供其他插件调用以触发S3上传 +func TriggerUpload(filePath string, deleteAfter bool) { + if s3PluginInstance == nil { + return // S3插件未启用或未初始化 + } + + objectKey := s3PluginInstance.generateObjectKey(filePath) + s3PluginInstance.QueueUpload(filePath, objectKey, deleteAfter) +}