1010 lines
25 KiB
Markdown
1010 lines
25 KiB
Markdown
# Go语言规则引擎
|
||
|
||
一个基于Go语言实现的灵活、高性能规则引擎,用于实现不同协议之间的数据转换和路由。
|
||
|
||
## 功能特性
|
||
|
||
- **多协议支持**:支持MQTT、HTTP API、WebSocket、NATS、Redis等不同协议之间的数据交互
|
||
- **灵活的规则配置**:基于JSON配置的提取规则和构建规则
|
||
- **事件驱动模式**:支持基于事件的数据处理模式,如MQTT订阅
|
||
- **轮询模式**:支持定时轮询获取数据的处理模式
|
||
- **RESTful API**:完整的规则管理API接口
|
||
- **动态规则管理**:支持规则的热加载和动态更新
|
||
- **可视化配置界面**:内置Web管理界面,支持规则的可视化配置
|
||
- **规则诊断功能**:提供规则测试和诊断功能
|
||
- **高性能设计**:基于Go语言的并发模型实现
|
||
- **可靠性保障**:完善的错误处理和日志记录
|
||
- **分布式追踪支持**:集成OpenTelemetry分布式追踪功能
|
||
- **指标监控**:支持Prometheus指标收集和监控
|
||
|
||
## 系统架构
|
||
|
||

|
||
|
||
规则引擎由以下核心模块组成:
|
||
|
||
### 1. 规则管理模块
|
||
|
||
负责规则的增删改查和持久化存储,主要包含:
|
||
- 规则配置文件加载与解析
|
||
- 规则CRUD操作接口
|
||
- 规则状态管理(启用/禁用)
|
||
- 规则版本管理
|
||
|
||
### 2. 协议适配器模块
|
||
|
||
负责不同协议的数据获取和数据推送,主要包含:
|
||
- **MQTT适配器**:支持MQTT协议的发布与订阅
|
||
- **HTTP适配器**:支持HTTP API的请求与响应
|
||
- **WebSocket适配器**:支持WebSocket的连接与通信
|
||
- **NATS适配器**:支持NATS消息系统的发布与订阅
|
||
- **Redis适配器**:支持Redis的Pub/Sub功能
|
||
- 适配器注册表:动态管理不同类型的适配器
|
||
- 适配器工厂:根据规则创建对应的适配器实例
|
||
|
||
协议适配器实现了以下核心接口:
|
||
- `DataFetcher`:数据获取接口
|
||
- `DataPusher`:数据推送接口
|
||
- `EventDrivenFetcher`:事件驱动数据获取接口
|
||
|
||
### 3. 数据处理引擎
|
||
|
||
负责数据提取和消息构建的核心逻辑,主要包含:
|
||
- JSON数据解析与处理
|
||
- 基于规则的数据提取
|
||
- 基于模板的消息构建
|
||
- 支持JavaScript表达式的数据转换
|
||
|
||
### 4. API模块
|
||
|
||
提供完整的RESTful管理接口,支持:
|
||
- 规则的CRUD操作
|
||
- 规则启动/停止控制
|
||
- 规则测试与诊断
|
||
- 系统状态监控
|
||
|
||
### 5. Web管理界面
|
||
|
||
提供直观的可视化配置界面,支持:
|
||
- 规则列表管理
|
||
- 可视化规则编辑
|
||
- 规则测试工具
|
||
- 运行状态监控
|
||
- 系统配置管理
|
||
|
||
### 6. 可观测性模块
|
||
|
||
提供系统监控和故障诊断能力:
|
||
- 分布式追踪:基于OpenTelemetry
|
||
- 性能指标:支持Prometheus指标收集
|
||
- 结构化日志:多级别、多输出的日志系统
|
||
|
||
## 快速开始
|
||
|
||
### 环境要求
|
||
|
||
- Go 1.19或更高版本
|
||
- 若使用MQTT功能,需要有可访问的MQTT Broker
|
||
- 若使用NATS功能,需要有可访问的NATS服务器
|
||
- 若使用Redis功能,需要有可访问的Redis服务器
|
||
|
||
### 从源码安装
|
||
|
||
```bash
|
||
# 克隆代码库
|
||
git clone https://github.com/darkit/rule-engine.git
|
||
|
||
# 进入项目目录
|
||
cd rule-engine
|
||
|
||
# 安装依赖
|
||
go mod tidy
|
||
|
||
# 编译
|
||
go build -o rule-engine cmd/server/main.go
|
||
```
|
||
|
||
### 使用Docker运行
|
||
|
||
```bash
|
||
# 构建Docker镜像
|
||
docker build -t rule-engine .
|
||
|
||
# 运行容器
|
||
docker run -p 8080:8080 -v ./config.yaml:/app/config.yaml -v ./rules:/app/rules rule-engine
|
||
```
|
||
|
||
### 使用Docker Compose运行
|
||
|
||
```bash
|
||
# 启动服务
|
||
docker-compose up -d
|
||
```
|
||
|
||
## 配置指南
|
||
|
||
### 主配置文件
|
||
|
||
编辑 `config.yaml` 文件:
|
||
|
||
```yaml
|
||
server:
|
||
port: 8080 # 服务器端口
|
||
host: 0.0.0.0 # 监听地址
|
||
read_timeout: 60 # 读取超时时间(秒)
|
||
write_timeout: 60 # 写入超时时间(秒)
|
||
shutdown_timeout: 10 # 优雅关闭超时时间(秒)
|
||
|
||
storage:
|
||
type: file # 存储类型:file/db
|
||
rule_path: ./rules # 规则文件存储路径
|
||
|
||
logger:
|
||
level: info # 日志级别:debug/info/warn/error
|
||
file: ./logs/rule-engine.log # 日志文件路径
|
||
max_size: 10 # 单个日志文件最大大小(MB)
|
||
max_backup: 5 # 保留的旧日志文件数量
|
||
max_age: 30 # 旧日志文件保留最大天数
|
||
compress: true # 是否压缩旧日志文件
|
||
console: true # 是否输出到控制台
|
||
|
||
security:
|
||
enable_tls: false # 是否启用TLS
|
||
cert_file: "" # TLS证书文件路径
|
||
key_file: "" # TLS密钥文件路径
|
||
allowed_origins: "*" # CORS允许的来源
|
||
|
||
telemetry:
|
||
trace:
|
||
enabled: true # 是否启用分布式追踪
|
||
service_name: "rule-engine" # 服务名称
|
||
service_version: "1.0.0" # 服务版本
|
||
environment: "development" # 环境名称
|
||
sampling_ratio: 0.1 # 采样率
|
||
exporter_type: "jaeger" # 导出类型:jaeger/otlp/stdout/none
|
||
jaeger_endpoint: "http://localhost:14268/api/traces" # Jaeger接入点
|
||
otlp_endpoint: "localhost:4317" # OTLP接入点
|
||
```
|
||
|
||
### 规则配置示例
|
||
|
||
规则以JSON格式存储,每个规则对应一个JSON文件,存放在`rules`目录下:
|
||
|
||
```json
|
||
{
|
||
"id": "rule-1",
|
||
"name": "MQTT到HTTP的转换规则",
|
||
"input_config": {
|
||
"source_type": "MQTT",
|
||
"address": "tcp://broker.example.com:1883",
|
||
"topic": "sensors/temperature",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_password"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "HTTPAPI",
|
||
"address": "https://api.example.com/data",
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"Authorization": "Bearer token123"
|
||
}
|
||
},
|
||
"extract_rule": "{\"temp\": \"$.temperature\", \"humidity\": \"$.humidity\", \"timestamp\": \"$.time\"}",
|
||
"build_rule": "{\"value\": \"$temp\", \"humidity_level\": \"$humidity\", \"recorded_at\": \"$timestamp\", \"device_type\": \"temperature_sensor\"}",
|
||
"interval": 10,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 规则配置详解
|
||
|
||
#### 输入配置 (input_config)
|
||
|
||
| 字段 | 类型 | 描述 | 适用协议 |
|
||
|-----|-----|------|---------|
|
||
| source_type | string | 数据源类型:MQTT/HTTPAPI/WEBSOCKET/NATS/REDIS | 全部 |
|
||
| address | string | 数据源地址 | 全部 |
|
||
| request_template | string | 请求模板 | HTTP |
|
||
| headers | map | HTTP头信息 | HTTP |
|
||
| topic | string | 主题/通道名称 | MQTT/NATS/REDIS |
|
||
| auth | object | 认证信息 | 全部 |
|
||
|
||
#### 输出配置 (output_config)
|
||
|
||
| 字段 | 类型 | 描述 | 适用协议 |
|
||
|-----|-----|------|---------|
|
||
| target_type | string | 目标类型:MQTT/HTTPAPI/WEBSOCKET/NATS/REDIS | 全部 |
|
||
| address | string | 目标地址 | 全部 |
|
||
| field_template | string | 字段模板(可选) | 全部 |
|
||
| headers | map | HTTP头信息 | HTTP |
|
||
| topic | string | 主题/通道名称 | MQTT/NATS/REDIS |
|
||
| auth | object | 认证信息 | 全部 |
|
||
|
||
#### 提取规则 (extract_rule)
|
||
|
||
采用JSON格式,基于JSONPath表达式提取数据:
|
||
|
||
```json
|
||
{
|
||
"字段名": "$.json.path.表达式",
|
||
"温度": "$.data.temperature",
|
||
"湿度": "$.data.humidity"
|
||
}
|
||
```
|
||
|
||
#### 构建规则 (build_rule)
|
||
|
||
采用JSON格式,使用`$变量名`引用提取的数据:
|
||
|
||
```json
|
||
{
|
||
"value": "$温度",
|
||
"humidity": "$湿度",
|
||
"device_id": "sensor-01",
|
||
"timestamp": "$时间戳"
|
||
}
|
||
```
|
||
|
||
## API接口文档
|
||
|
||
### 规则管理接口
|
||
|
||
#### 获取所有规则
|
||
|
||
```
|
||
GET /api/rules
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "success",
|
||
"data": [
|
||
{
|
||
"id": "rule-1",
|
||
"name": "MQTT到HTTP的转换规则",
|
||
"input_config": { /* ... */ },
|
||
"output_config": { /* ... */ },
|
||
"extract_rule": "{ /* ... */ }",
|
||
"build_rule": "{ /* ... */ }",
|
||
"interval": 10,
|
||
"enabled": true
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
#### 获取单个规则
|
||
|
||
```
|
||
GET /api/rules/{id}
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "success",
|
||
"data": {
|
||
"id": "rule-1",
|
||
"name": "MQTT到HTTP的转换规则",
|
||
"input_config": { /* ... */ },
|
||
"output_config": { /* ... */ },
|
||
"extract_rule": "{ /* ... */ }",
|
||
"build_rule": "{ /* ... */ }",
|
||
"interval": 10,
|
||
"enabled": true
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 创建规则
|
||
|
||
```
|
||
POST /api/rules
|
||
```
|
||
|
||
**请求体**:
|
||
|
||
```json
|
||
{
|
||
"id": "rule-1",
|
||
"name": "MQTT到HTTP的转换规则",
|
||
"input_config": { /* ... */ },
|
||
"output_config": { /* ... */ },
|
||
"extract_rule": "{ /* ... */ }",
|
||
"build_rule": "{ /* ... */ }",
|
||
"interval": 10,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则创建成功",
|
||
"data": {
|
||
"id": "rule-1"
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 更新规则
|
||
|
||
```
|
||
PUT /api/rules/{id}
|
||
```
|
||
|
||
**请求体**:同创建规则
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则更新成功",
|
||
"data": null
|
||
}
|
||
```
|
||
|
||
#### 删除规则
|
||
|
||
```
|
||
DELETE /api/rules/{id}
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则删除成功",
|
||
"data": null
|
||
}
|
||
```
|
||
|
||
### 规则操作接口
|
||
|
||
#### 启动规则
|
||
|
||
```
|
||
POST /api/rules/{id}/start
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则启动成功",
|
||
"data": null
|
||
}
|
||
```
|
||
|
||
#### 停止规则
|
||
|
||
```
|
||
POST /api/rules/{id}/stop
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则停止成功",
|
||
"data": null
|
||
}
|
||
```
|
||
|
||
#### 测试规则
|
||
|
||
```
|
||
POST /api/rules/{id}/test
|
||
```
|
||
|
||
**请求体**:
|
||
|
||
```json
|
||
{
|
||
"test_data": { /* 测试输入数据 */ },
|
||
"extract_rule": { /* 可选:提取规则 */ },
|
||
"build_rule": { /* 可选:构建规则 */ }
|
||
}
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则测试成功",
|
||
"data": {
|
||
"input_data": { /* 输入数据 */ },
|
||
"extracted_data": { /* 提取的数据 */ },
|
||
"output_data": { /* 构建的输出数据 */ }
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 诊断规则
|
||
|
||
```
|
||
POST /api/rules/{id}/diagnose
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "规则诊断成功",
|
||
"data": {
|
||
"rule_id": "rule-1",
|
||
"status": "running",
|
||
"last_execution": "2023-03-15T08:10:30Z",
|
||
"execution_count": 156,
|
||
"success_count": 152,
|
||
"error_count": 4,
|
||
"last_error": "连接超时",
|
||
"performance": {
|
||
"avg_time": 28.5,
|
||
"max_time": 120.3,
|
||
"min_time": 10.2
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 系统状态接口
|
||
|
||
```
|
||
GET /api/status
|
||
```
|
||
|
||
**响应示例**:
|
||
|
||
```json
|
||
{
|
||
"code": 0,
|
||
"message": "success",
|
||
"data": {
|
||
"uptime": "3d 5h 12m 30s",
|
||
"version": "1.0.0",
|
||
"rules": {
|
||
"total": 5,
|
||
"running": 3,
|
||
"stopped": 2
|
||
},
|
||
"system": {
|
||
"cpu_usage": 2.5,
|
||
"memory_usage": "128MB",
|
||
"goroutines": 15
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 使用示例
|
||
|
||
### 示例1:MQTT转HTTP
|
||
|
||
创建一个从MQTT接收温度传感器数据并转发到HTTP API的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "mqtt-to-http",
|
||
"name": "温度传感器数据转发",
|
||
"input_config": {
|
||
"source_type": "MQTT",
|
||
"address": "tcp://mqtt.example.com:1883",
|
||
"topic": "sensors/temperature",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_pass"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "HTTPAPI",
|
||
"address": "https://api.example.com/data",
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"Authorization": "Bearer token123"
|
||
}
|
||
},
|
||
"extract_rule": "{\"device_id\": \"$.device_id\", \"temperature\": \"$.readings.temperature\", \"humidity\": \"$.readings.humidity\", \"timestamp\": \"$.timestamp\"}",
|
||
"build_rule": "{\"device\": \"$device_id\", \"readings\": {\"temp\": \"$temperature\", \"hum\": \"$humidity\"}, \"time\": \"$timestamp\", \"source\": \"sensor-network\"}",
|
||
"interval": 0,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例2:HTTP轮询转MQTT
|
||
|
||
创建一个定期从HTTP API获取数据并发布到MQTT的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "http-poll-to-mqtt",
|
||
"name": "天气数据采集",
|
||
"input_config": {
|
||
"source_type": "HTTPAPI",
|
||
"address": "https://api.weather.com/current?location=beijing",
|
||
"headers": {
|
||
"API-Key": "your-api-key"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "MQTT",
|
||
"address": "tcp://mqtt.example.com:1883",
|
||
"topic": "weather/beijing",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_pass"
|
||
}
|
||
},
|
||
"extract_rule": "{\"temperature\": \"$.current.temperature\", \"humidity\": \"$.current.humidity\", \"wind\": \"$.current.wind\", \"updated_at\": \"$.current.updated_at\"}",
|
||
"build_rule": "{\"weather\": {\"temp\": \"$temperature\", \"humidity\": \"$humidity\", \"wind_speed\": \"$wind.speed\", \"wind_direction\": \"$wind.direction\"}, \"location\": \"beijing\", \"timestamp\": \"$updated_at\"}",
|
||
"interval": 300,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例3:WebSocket到MQTT
|
||
|
||
创建一个从WebSocket接收实时数据并转发到MQTT的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "websocket-to-mqtt",
|
||
"name": "股票行情转发",
|
||
"input_config": {
|
||
"source_type": "WEBSOCKET",
|
||
"address": "wss://realtime.stock-api.com/quotes",
|
||
"headers": {
|
||
"Authorization": "Bearer token123"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "MQTT",
|
||
"address": "tcp://mqtt.example.com:1883",
|
||
"topic": "stocks/realtime",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_pass"
|
||
}
|
||
},
|
||
"extract_rule": "{\"symbol\": \"$.symbol\", \"price\": \"$.price\", \"change\": \"$.change\", \"volume\": \"$.volume\", \"timestamp\": \"$.timestamp\"}",
|
||
"build_rule": "{\"stock\": \"$symbol\", \"current_price\": \"$price\", \"daily_change\": \"$change\", \"trade_volume\": \"$volume\", \"updated_at\": \"$timestamp\"}",
|
||
"interval": 0,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例4:NATS到HTTP
|
||
|
||
创建一个从NATS接收消息并转发到HTTP API的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "nats-to-http",
|
||
"name": "NATS消息转HTTP",
|
||
"input_config": {
|
||
"source_type": "NATS",
|
||
"address": "nats://nats-server:4222",
|
||
"topic": "events.incoming",
|
||
"auth": {
|
||
"username": "nats_user",
|
||
"password": "nats_pass"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "HTTPAPI",
|
||
"address": "https://api.example.com/events",
|
||
"headers": {
|
||
"Content-Type": "application/json",
|
||
"Authorization": "Bearer token123"
|
||
}
|
||
},
|
||
"extract_rule": "{\"event_id\": \"$.id\", \"event_type\": \"$.type\", \"payload\": \"$.data\", \"timestamp\": \"$.timestamp\"}",
|
||
"build_rule": "{\"id\": \"$event_id\", \"type\": \"$event_type\", \"data\": \"$payload\", \"received_at\": \"$timestamp\", \"processed_at\": \"new Date().toISOString()\"}",
|
||
"interval": 0,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例5:HTTP到NATS
|
||
|
||
创建一个定期从HTTP API获取数据并发布到NATS的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "http-to-nats",
|
||
"name": "HTTP数据发布到NATS",
|
||
"input_config": {
|
||
"source_type": "HTTPAPI",
|
||
"address": "https://api.example.com/status",
|
||
"headers": {
|
||
"API-Key": "your-api-key"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "NATS",
|
||
"address": "nats://nats-server:4222",
|
||
"topic": "system.status",
|
||
"auth": {
|
||
"username": "nats_user",
|
||
"password": "nats_pass"
|
||
}
|
||
},
|
||
"extract_rule": "{\"status\": \"$.status\", \"metrics\": \"$.metrics\", \"timestamp\": \"$.updated_at\"}",
|
||
"build_rule": "{\"system_status\": \"$status\", \"performance_metrics\": \"$metrics\", \"time\": \"$timestamp\"}",
|
||
"interval": 60,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例6:Redis到MQTT
|
||
|
||
创建一个从Redis通道订阅数据并转发到MQTT的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "redis-to-mqtt",
|
||
"name": "Redis消息转MQTT",
|
||
"input_config": {
|
||
"source_type": "REDIS",
|
||
"address": "redis://redis-server:6379",
|
||
"topic": "notifications",
|
||
"auth": {
|
||
"password": "redis_password"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "MQTT",
|
||
"address": "tcp://mqtt-broker:1883",
|
||
"topic": "app/notifications",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_pass"
|
||
}
|
||
},
|
||
"extract_rule": "{\"user_id\": \"$.user\", \"message\": \"$.content\", \"priority\": \"$.priority\", \"created_at\": \"$.time\"}",
|
||
"build_rule": "{\"notification\": {\"to\": \"$user_id\", \"message\": \"$message\", \"level\": \"$priority\"}, \"timestamp\": \"$created_at\"}",
|
||
"interval": 0,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
### 示例7:MQTT到Redis
|
||
|
||
创建一个从MQTT订阅数据并发布到Redis通道的规则:
|
||
|
||
```json
|
||
{
|
||
"id": "mqtt-to-redis",
|
||
"name": "MQTT数据转Redis",
|
||
"input_config": {
|
||
"source_type": "MQTT",
|
||
"address": "tcp://mqtt-broker:1883",
|
||
"topic": "devices/+/telemetry",
|
||
"auth": {
|
||
"username": "mqtt_user",
|
||
"password": "mqtt_pass"
|
||
}
|
||
},
|
||
"output_config": {
|
||
"target_type": "REDIS",
|
||
"address": "redis://redis-server:6379",
|
||
"topic": "device-telemetry",
|
||
"auth": {
|
||
"password": "redis_password"
|
||
}
|
||
},
|
||
"extract_rule": "{\"device_id\": \"$.device\", \"metrics\": \"$.telemetry\", \"battery\": \"$.battery\", \"timestamp\": \"$.time\"}",
|
||
"build_rule": "{\"id\": \"$device_id\", \"data\": \"$metrics\", \"battery_level\": \"$battery\", \"reported_at\": \"$timestamp\"}",
|
||
"interval": 0,
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
## 高级功能
|
||
|
||
### 使用JavaScript表达式
|
||
|
||
规则引擎支持在提取规则和构建规则中使用JavaScript表达式进行数据转换:
|
||
|
||
```json
|
||
{
|
||
"extract_rule": "{\"celsius\": \"$.temperature\", \"humidity\": \"$.humidity\"}",
|
||
"build_rule": "{\"temperature_f\": \"$celsius * 1.8 + 32\", \"humidity_percent\": \"$humidity * 100\", \"status\": \"$celsius > 30 ? 'hot' : ($celsius < 10 ? 'cold' : 'moderate')\"}"
|
||
}
|
||
```
|
||
|
||
### 数据批处理
|
||
|
||
对于需要批量处理的场景,可以配置批处理规则:
|
||
|
||
```json
|
||
{
|
||
"input_config": {
|
||
"source_type": "HTTPAPI",
|
||
"address": "https://api.example.com/batch-data",
|
||
"batch_size": 100,
|
||
"batch_timeout": 30
|
||
},
|
||
"extract_rule": "{\"items\": \"$.data[*]\", \"timestamp\": \"$.meta.timestamp\"}",
|
||
"build_rule": "{\"processed_items\": \"$items.map(item => ({id: item.id, value: item.value * 2}))\", \"batch_time\": \"$timestamp\", \"count\": \"$items.length\"}"
|
||
}
|
||
```
|
||
|
||
### 条件路由
|
||
|
||
根据数据内容条件性地选择不同的输出目标:
|
||
|
||
```json
|
||
{
|
||
"id": "conditional-routing",
|
||
"name": "条件路由示例",
|
||
"input_config": {
|
||
"source_type": "MQTT",
|
||
"address": "tcp://mqtt.example.com:1883",
|
||
"topic": "devices/+/data"
|
||
},
|
||
"output_config": {
|
||
"target_type": "HTTPAPI",
|
||
"address": "https://api.example.com/data",
|
||
"conditional_routes": [
|
||
{
|
||
"condition": "$device_type == 'temperature'",
|
||
"address": "https://api.example.com/temperature"
|
||
},
|
||
{
|
||
"condition": "$device_type == 'humidity'",
|
||
"address": "https://api.example.com/humidity"
|
||
}
|
||
]
|
||
},
|
||
"extract_rule": "{\"device_id\": \"$.device_id\", \"device_type\": \"$.type\", \"value\": \"$.value\"}",
|
||
"build_rule": "{\"id\": \"$device_id\", \"reading\": \"$value\", \"timestamp\": \"new Date().toISOString()\"}",
|
||
"enabled": true
|
||
}
|
||
```
|
||
|
||
## 项目结构
|
||
|
||
```
|
||
rule-engine/
|
||
├── api/ # API接口层
|
||
│ └── handler.go # API处理器
|
||
├── cmd/ # 命令行工具
|
||
│ └── server/ # 服务器入口
|
||
│ └── main.go # 主程序入口
|
||
├── internal/ # 内部包
|
||
│ ├── adapter/ # 协议适配器
|
||
│ │ ├── adapter.go # 适配器接口定义
|
||
│ │ ├── http_adapter.go # HTTP适配器
|
||
│ │ ├── mqtt_adapter.go # MQTT适配器
|
||
│ │ ├── websocket_adapter.go # WebSocket适配器
|
||
│ │ ├── nats_adapter.go # NATS适配器
|
||
│ │ └── redis_adapter.go # Redis适配器
|
||
│ ├── config/ # 配置管理
|
||
│ │ └── config.go # 配置加载和管理
|
||
│ ├── engine/ # 规则引擎核心
|
||
│ │ ├── engine.go # 规则引擎实现
|
||
│ │ ├── engine_interface.go # 规则引擎接口
|
||
│ │ └── processor.go # 数据处理器
|
||
│ ├── middleware/ # 中间件
|
||
│ │ ├── logger.go # 日志中间件
|
||
│ │ ├── recovery.go # 恢复中间件
|
||
│ │ └── error_handler.go # 错误处理中间件
|
||
│ ├── model/ # 数据模型
|
||
│ │ └── rule.go # 规则模型定义
|
||
│ ├── repository/ # 数据仓库
|
||
│ │ └── rule_repository.go # 规则仓库
|
||
│ ├── service/ # 业务服务
|
||
│ │ ├── rule_service.go # 规则服务
|
||
│ │ └── diagnostic_service.go # 诊断服务
|
||
│ └── validator/ # 数据验证
|
||
│ └── rule_validator.go # 规则验证器
|
||
├── pkg/ # 公共包
|
||
│ ├── logger/ # 日志工具
|
||
│ │ └── logger.go # 日志实现
|
||
│ ├── utils/ # 工具函数
|
||
│ │ ├── json.go # JSON工具
|
||
│ │ └── template.go # 模板工具
|
||
│ └── trace/ # 分布式追踪
|
||
│ └── tracer.go # 追踪实现
|
||
├── web/ # Web界面
|
||
│ └── dist/ # 编译后的前端资源
|
||
├── rules/ # 规则存储目录
|
||
├── logs/ # 日志文件目录
|
||
├── .air.toml # Air热重载配置
|
||
├── .gitignore # Git忽略配置
|
||
├── config.yaml # 配置文件
|
||
├── Dockerfile # Docker构建文件
|
||
├── docker-compose.yml # Docker Compose配置
|
||
├── go.mod # Go模块定义
|
||
├── go.sum # 依赖版本锁定
|
||
├── Makefile # 构建脚本
|
||
└── README.md # 项目说明
|
||
```
|
||
|
||
## 性能优化
|
||
|
||
规则引擎设计了多项性能优化机制:
|
||
|
||
1. **对象池**:减少频繁的内存分配和GC压力
|
||
2. **连接池**:复用网络连接,减少连接建立的开销
|
||
3. **批处理机制**:支持数据批量处理,减少网络交互
|
||
4. **并发处理**:充分利用Go语言的goroutine特性
|
||
5. **缓存机制**:缓存规则解析结果,提高执行效率
|
||
|
||
## 可观测性
|
||
|
||
### 分布式追踪
|
||
|
||
项目集成了OpenTelemetry分布式追踪支持,可以追踪:
|
||
|
||
- HTTP请求处理链路
|
||
- 规则执行流程
|
||
- 数据源连接和数据获取
|
||
- 数据处理和转换
|
||
- 数据推送到目标系统
|
||
|
||
支持多种追踪后端:
|
||
- Jaeger
|
||
- OpenTelemetry Collector
|
||
- 标准输出(用于调试)
|
||
|
||
### 性能指标
|
||
|
||
使用Prometheus收集关键指标:
|
||
|
||
- 规则执行次数和耗时
|
||
- 消息处理量和队列状态
|
||
- 错误计数和类型统计
|
||
- 连接池状态监控
|
||
- 系统资源使用情况
|
||
- GC统计信息
|
||
|
||
## 部署指南
|
||
|
||
### 独立部署
|
||
|
||
```bash
|
||
# 构建二进制文件
|
||
go build -o rule-engine cmd/server/main.go
|
||
|
||
# 创建配置和规则目录
|
||
mkdir -p rules logs
|
||
|
||
# 运行服务
|
||
./rule-engine --config=config.yaml
|
||
```
|
||
|
||
### Docker部署
|
||
|
||
```bash
|
||
# 构建Docker镜像
|
||
docker build -t rule-engine .
|
||
|
||
# 运行容器
|
||
docker run -d --name rule-engine \
|
||
-p 8080:8080 \
|
||
-v $(pwd)/config.yaml:/app/config.yaml \
|
||
-v $(pwd)/rules:/app/rules \
|
||
-v $(pwd)/logs:/app/logs \
|
||
rule-engine
|
||
```
|
||
|
||
### Kubernetes部署
|
||
|
||
创建ConfigMap和Deployment:
|
||
|
||
```yaml
|
||
apiVersion: v1
|
||
kind: ConfigMap
|
||
metadata:
|
||
name: rule-engine-config
|
||
data:
|
||
config.yaml: |
|
||
server:
|
||
port: 8080
|
||
host: 0.0.0.0
|
||
# 其他配置...
|
||
|
||
---
|
||
apiVersion: apps/v1
|
||
kind: Deployment
|
||
metadata:
|
||
name: rule-engine
|
||
spec:
|
||
replicas: 1
|
||
selector:
|
||
matchLabels:
|
||
app: rule-engine
|
||
template:
|
||
metadata:
|
||
labels:
|
||
app: rule-engine
|
||
spec:
|
||
containers:
|
||
- name: rule-engine
|
||
image: rule-engine:latest
|
||
ports:
|
||
- containerPort: 8080
|
||
volumeMounts:
|
||
- name: config
|
||
mountPath: /app/config.yaml
|
||
subPath: config.yaml
|
||
- name: rules
|
||
mountPath: /app/rules
|
||
- name: logs
|
||
mountPath: /app/logs
|
||
volumes:
|
||
- name: config
|
||
configMap:
|
||
name: rule-engine-config
|
||
- name: rules
|
||
persistentVolumeClaim:
|
||
claimName: rule-engine-rules
|
||
- name: logs
|
||
persistentVolumeClaim:
|
||
claimName: rule-engine-logs
|
||
```
|
||
|
||
## 故障排查
|
||
|
||
### 常见问题
|
||
|
||
1. **规则无法启动**
|
||
- 检查规则配置是否正确
|
||
- 检查数据源连接是否可用
|
||
- 查看日志中的错误信息
|
||
|
||
2. **数据未能成功转换**
|
||
- 验证提取规则是否匹配输入数据结构
|
||
- 使用测试API验证规则执行过程
|
||
- 检查数据格式是否符合预期
|
||
|
||
3. **性能问题**
|
||
- 调整批处理参数
|
||
- 优化规则执行间隔
|
||
- 检查网络连接和系统资源使用情况
|
||
|
||
### 日志分析
|
||
|
||
日志文件默认位于 `./logs/rule-engine.log`,支持不同的日志级别:
|
||
|
||
- DEBUG:详细调试信息
|
||
- INFO:一般操作信息
|
||
- WARN:警告信息
|
||
- ERROR:错误信息
|
||
- FATAL:致命错误
|
||
|
||
### 诊断命令
|
||
|
||
使用API进行系统诊断:
|
||
|
||
```bash
|
||
# 获取系统状态
|
||
curl http://localhost:8080/api/status
|
||
|
||
# 获取规则状态
|
||
curl http://localhost:8080/api/rules/{id}
|
||
|
||
# 诊断规则
|
||
curl -X POST http://localhost:8080/api/rules/{id}/diagnose
|
||
```
|
||
|
||
## 许可证
|
||
|
||
MIT |