25 KiB
Go语言规则引擎
一个基于Go语言实现的灵活、高性能规则引擎,用于实现不同协议之间的数据转换和路由。
功能特性
- 多协议支持:支持MQTT、HTTP API、WebSocket、NATS、Redis等不同协议之间的数据交互
- 灵活的规则配置:基于JSON配置的提取规则和构建规则
- 事件驱动模式:支持基于事件的数据处理模式,如MQTT订阅
- 轮询模式:支持定时轮询获取数据的处理模式
- RESTful API:完整的规则管理API接口
- 动态规则管理:支持规则的热加载和动态更新
- 可视化配置界面:内置Web管理界面,支持规则的可视化配置
- 规则诊断功能:提供规则测试和诊断功能
- 高性能设计:基于Go语言的并发模型实现
- 可靠性保障:完善的错误处理和日志记录
- 分布式追踪支持:集成OpenTelemetry分布式追踪功能
- 指标监控:支持Prometheus指标收集和监控
系统架构
规则引擎由以下核心模块组成:
1. 规则管理模块
负责规则的增删改查和持久化存储,主要包含:
- 规则配置文件加载与解析
- 规则CRUD操作接口
- 规则状态管理(启用/禁用)
- 规则版本管理
2. 协议适配器模块
负责不同协议的数据获取和数据推送,主要包含:
- MQTT适配器:支持MQTT协议的发布与订阅
- HTTP适配器:支持HTTP API的请求与响应
- WebSocket适配器:支持WebSocket的连接与通信
- NATS适配器:支持NATS消息系统的发布与订阅
- Redis适配器:支持Redis的Pub/Sub功能
- 适配器注册表:动态管理不同类型的适配器
- 适配器工厂:根据规则创建对应的适配器实例
协议适配器实现了以下核心接口:
DataFetcher:数据获取接口DataPusher:数据推送接口EventDrivenFetcher:事件驱动数据获取接口
3. 数据处理引擎
负责数据提取和消息构建的核心逻辑,主要包含:
- JSON数据解析与处理
- 基于规则的数据提取
- 基于模板的消息构建
- 支持JavaScript表达式的数据转换
4. API模块
提供完整的RESTful管理接口,支持:
- 规则的CRUD操作
- 规则启动/停止控制
- 规则测试与诊断
- 系统状态监控
5. Web管理界面
提供直观的可视化配置界面,支持:
- 规则列表管理
- 可视化规则编辑
- 规则测试工具
- 运行状态监控
- 系统配置管理
6. 可观测性模块
提供系统监控和故障诊断能力:
- 分布式追踪:基于OpenTelemetry
- 性能指标:支持Prometheus指标收集
- 结构化日志:多级别、多输出的日志系统
快速开始
环境要求
- Go 1.19或更高版本
- 若使用MQTT功能,需要有可访问的MQTT Broker
- 若使用NATS功能,需要有可访问的NATS服务器
- 若使用Redis功能,需要有可访问的Redis服务器
从源码安装
# 克隆代码库
git clone https://github.com/darkit/rule-engine.git
# 进入项目目录
cd rule-engine
# 安装依赖
go mod tidy
# 编译
go build -o rule-engine cmd/server/main.go
使用Docker运行
# 构建Docker镜像
docker build -t rule-engine .
# 运行容器
docker run -p 8080:8080 -v ./config.yaml:/app/config.yaml -v ./rules:/app/rules rule-engine
使用Docker Compose运行
# 启动服务
docker-compose up -d
配置指南
主配置文件
编辑 config.yaml 文件:
server:
port: 8080 # 服务器端口
host: 0.0.0.0 # 监听地址
read_timeout: 60 # 读取超时时间(秒)
write_timeout: 60 # 写入超时时间(秒)
shutdown_timeout: 10 # 优雅关闭超时时间(秒)
storage:
type: file # 存储类型:file/db
rule_path: ./rules # 规则文件存储路径
logger:
level: info # 日志级别:debug/info/warn/error
file: ./logs/rule-engine.log # 日志文件路径
max_size: 10 # 单个日志文件最大大小(MB)
max_backup: 5 # 保留的旧日志文件数量
max_age: 30 # 旧日志文件保留最大天数
compress: true # 是否压缩旧日志文件
console: true # 是否输出到控制台
security:
enable_tls: false # 是否启用TLS
cert_file: "" # TLS证书文件路径
key_file: "" # TLS密钥文件路径
allowed_origins: "*" # CORS允许的来源
telemetry:
trace:
enabled: true # 是否启用分布式追踪
service_name: "rule-engine" # 服务名称
service_version: "1.0.0" # 服务版本
environment: "development" # 环境名称
sampling_ratio: 0.1 # 采样率
exporter_type: "jaeger" # 导出类型:jaeger/otlp/stdout/none
jaeger_endpoint: "http://localhost:14268/api/traces" # Jaeger接入点
otlp_endpoint: "localhost:4317" # OTLP接入点
规则配置示例
规则以JSON格式存储,每个规则对应一个JSON文件,存放在rules目录下:
{
"id": "rule-1",
"name": "MQTT到HTTP的转换规则",
"input_config": {
"source_type": "MQTT",
"address": "tcp://broker.example.com:1883",
"topic": "sensors/temperature",
"auth": {
"username": "mqtt_user",
"password": "mqtt_password"
}
},
"output_config": {
"target_type": "HTTPAPI",
"address": "https://api.example.com/data",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer token123"
}
},
"extract_rule": "{\"temp\": \"$.temperature\", \"humidity\": \"$.humidity\", \"timestamp\": \"$.time\"}",
"build_rule": "{\"value\": \"$temp\", \"humidity_level\": \"$humidity\", \"recorded_at\": \"$timestamp\", \"device_type\": \"temperature_sensor\"}",
"interval": 10,
"enabled": true
}
规则配置详解
输入配置 (input_config)
| 字段 | 类型 | 描述 | 适用协议 |
|---|---|---|---|
| source_type | string | 数据源类型:MQTT/HTTPAPI/WEBSOCKET/NATS/REDIS | 全部 |
| address | string | 数据源地址 | 全部 |
| request_template | string | 请求模板 | HTTP |
| headers | map | HTTP头信息 | HTTP |
| topic | string | 主题/通道名称 | MQTT/NATS/REDIS |
| auth | object | 认证信息 | 全部 |
输出配置 (output_config)
| 字段 | 类型 | 描述 | 适用协议 |
|---|---|---|---|
| target_type | string | 目标类型:MQTT/HTTPAPI/WEBSOCKET/NATS/REDIS | 全部 |
| address | string | 目标地址 | 全部 |
| field_template | string | 字段模板(可选) | 全部 |
| headers | map | HTTP头信息 | HTTP |
| topic | string | 主题/通道名称 | MQTT/NATS/REDIS |
| auth | object | 认证信息 | 全部 |
提取规则 (extract_rule)
采用JSON格式,基于JSONPath表达式提取数据:
{
"字段名": "$.json.path.表达式",
"温度": "$.data.temperature",
"湿度": "$.data.humidity"
}
构建规则 (build_rule)
采用JSON格式,使用$变量名引用提取的数据:
{
"value": "$温度",
"humidity": "$湿度",
"device_id": "sensor-01",
"timestamp": "$时间戳"
}
API接口文档
规则管理接口
获取所有规则
GET /api/rules
响应示例:
{
"code": 0,
"message": "success",
"data": [
{
"id": "rule-1",
"name": "MQTT到HTTP的转换规则",
"input_config": { /* ... */ },
"output_config": { /* ... */ },
"extract_rule": "{ /* ... */ }",
"build_rule": "{ /* ... */ }",
"interval": 10,
"enabled": true
}
]
}
获取单个规则
GET /api/rules/{id}
响应示例:
{
"code": 0,
"message": "success",
"data": {
"id": "rule-1",
"name": "MQTT到HTTP的转换规则",
"input_config": { /* ... */ },
"output_config": { /* ... */ },
"extract_rule": "{ /* ... */ }",
"build_rule": "{ /* ... */ }",
"interval": 10,
"enabled": true
}
}
创建规则
POST /api/rules
请求体:
{
"id": "rule-1",
"name": "MQTT到HTTP的转换规则",
"input_config": { /* ... */ },
"output_config": { /* ... */ },
"extract_rule": "{ /* ... */ }",
"build_rule": "{ /* ... */ }",
"interval": 10,
"enabled": true
}
响应示例:
{
"code": 0,
"message": "规则创建成功",
"data": {
"id": "rule-1"
}
}
更新规则
PUT /api/rules/{id}
请求体:同创建规则
响应示例:
{
"code": 0,
"message": "规则更新成功",
"data": null
}
删除规则
DELETE /api/rules/{id}
响应示例:
{
"code": 0,
"message": "规则删除成功",
"data": null
}
规则操作接口
启动规则
POST /api/rules/{id}/start
响应示例:
{
"code": 0,
"message": "规则启动成功",
"data": null
}
停止规则
POST /api/rules/{id}/stop
响应示例:
{
"code": 0,
"message": "规则停止成功",
"data": null
}
测试规则
POST /api/rules/{id}/test
请求体:
{
"test_data": { /* 测试输入数据 */ },
"extract_rule": { /* 可选:提取规则 */ },
"build_rule": { /* 可选:构建规则 */ }
}
响应示例:
{
"code": 0,
"message": "规则测试成功",
"data": {
"input_data": { /* 输入数据 */ },
"extracted_data": { /* 提取的数据 */ },
"output_data": { /* 构建的输出数据 */ }
}
}
诊断规则
POST /api/rules/{id}/diagnose
响应示例:
{
"code": 0,
"message": "规则诊断成功",
"data": {
"rule_id": "rule-1",
"status": "running",
"last_execution": "2023-03-15T08:10:30Z",
"execution_count": 156,
"success_count": 152,
"error_count": 4,
"last_error": "连接超时",
"performance": {
"avg_time": 28.5,
"max_time": 120.3,
"min_time": 10.2
}
}
}
系统状态接口
GET /api/status
响应示例:
{
"code": 0,
"message": "success",
"data": {
"uptime": "3d 5h 12m 30s",
"version": "1.0.0",
"rules": {
"total": 5,
"running": 3,
"stopped": 2
},
"system": {
"cpu_usage": 2.5,
"memory_usage": "128MB",
"goroutines": 15
}
}
}
使用示例
示例1:MQTT转HTTP
创建一个从MQTT接收温度传感器数据并转发到HTTP API的规则:
{
"id": "mqtt-to-http",
"name": "温度传感器数据转发",
"input_config": {
"source_type": "MQTT",
"address": "tcp://mqtt.example.com:1883",
"topic": "sensors/temperature",
"auth": {
"username": "mqtt_user",
"password": "mqtt_pass"
}
},
"output_config": {
"target_type": "HTTPAPI",
"address": "https://api.example.com/data",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer token123"
}
},
"extract_rule": "{\"device_id\": \"$.device_id\", \"temperature\": \"$.readings.temperature\", \"humidity\": \"$.readings.humidity\", \"timestamp\": \"$.timestamp\"}",
"build_rule": "{\"device\": \"$device_id\", \"readings\": {\"temp\": \"$temperature\", \"hum\": \"$humidity\"}, \"time\": \"$timestamp\", \"source\": \"sensor-network\"}",
"interval": 0,
"enabled": true
}
示例2:HTTP轮询转MQTT
创建一个定期从HTTP API获取数据并发布到MQTT的规则:
{
"id": "http-poll-to-mqtt",
"name": "天气数据采集",
"input_config": {
"source_type": "HTTPAPI",
"address": "https://api.weather.com/current?location=beijing",
"headers": {
"API-Key": "your-api-key"
}
},
"output_config": {
"target_type": "MQTT",
"address": "tcp://mqtt.example.com:1883",
"topic": "weather/beijing",
"auth": {
"username": "mqtt_user",
"password": "mqtt_pass"
}
},
"extract_rule": "{\"temperature\": \"$.current.temperature\", \"humidity\": \"$.current.humidity\", \"wind\": \"$.current.wind\", \"updated_at\": \"$.current.updated_at\"}",
"build_rule": "{\"weather\": {\"temp\": \"$temperature\", \"humidity\": \"$humidity\", \"wind_speed\": \"$wind.speed\", \"wind_direction\": \"$wind.direction\"}, \"location\": \"beijing\", \"timestamp\": \"$updated_at\"}",
"interval": 300,
"enabled": true
}
示例3:WebSocket到MQTT
创建一个从WebSocket接收实时数据并转发到MQTT的规则:
{
"id": "websocket-to-mqtt",
"name": "股票行情转发",
"input_config": {
"source_type": "WEBSOCKET",
"address": "wss://realtime.stock-api.com/quotes",
"headers": {
"Authorization": "Bearer token123"
}
},
"output_config": {
"target_type": "MQTT",
"address": "tcp://mqtt.example.com:1883",
"topic": "stocks/realtime",
"auth": {
"username": "mqtt_user",
"password": "mqtt_pass"
}
},
"extract_rule": "{\"symbol\": \"$.symbol\", \"price\": \"$.price\", \"change\": \"$.change\", \"volume\": \"$.volume\", \"timestamp\": \"$.timestamp\"}",
"build_rule": "{\"stock\": \"$symbol\", \"current_price\": \"$price\", \"daily_change\": \"$change\", \"trade_volume\": \"$volume\", \"updated_at\": \"$timestamp\"}",
"interval": 0,
"enabled": true
}
示例4:NATS到HTTP
创建一个从NATS接收消息并转发到HTTP API的规则:
{
"id": "nats-to-http",
"name": "NATS消息转HTTP",
"input_config": {
"source_type": "NATS",
"address": "nats://nats-server:4222",
"topic": "events.incoming",
"auth": {
"username": "nats_user",
"password": "nats_pass"
}
},
"output_config": {
"target_type": "HTTPAPI",
"address": "https://api.example.com/events",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer token123"
}
},
"extract_rule": "{\"event_id\": \"$.id\", \"event_type\": \"$.type\", \"payload\": \"$.data\", \"timestamp\": \"$.timestamp\"}",
"build_rule": "{\"id\": \"$event_id\", \"type\": \"$event_type\", \"data\": \"$payload\", \"received_at\": \"$timestamp\", \"processed_at\": \"new Date().toISOString()\"}",
"interval": 0,
"enabled": true
}
示例5:HTTP到NATS
创建一个定期从HTTP API获取数据并发布到NATS的规则:
{
"id": "http-to-nats",
"name": "HTTP数据发布到NATS",
"input_config": {
"source_type": "HTTPAPI",
"address": "https://api.example.com/status",
"headers": {
"API-Key": "your-api-key"
}
},
"output_config": {
"target_type": "NATS",
"address": "nats://nats-server:4222",
"topic": "system.status",
"auth": {
"username": "nats_user",
"password": "nats_pass"
}
},
"extract_rule": "{\"status\": \"$.status\", \"metrics\": \"$.metrics\", \"timestamp\": \"$.updated_at\"}",
"build_rule": "{\"system_status\": \"$status\", \"performance_metrics\": \"$metrics\", \"time\": \"$timestamp\"}",
"interval": 60,
"enabled": true
}
示例6:Redis到MQTT
创建一个从Redis通道订阅数据并转发到MQTT的规则:
{
"id": "redis-to-mqtt",
"name": "Redis消息转MQTT",
"input_config": {
"source_type": "REDIS",
"address": "redis://redis-server:6379",
"topic": "notifications",
"auth": {
"password": "redis_password"
}
},
"output_config": {
"target_type": "MQTT",
"address": "tcp://mqtt-broker:1883",
"topic": "app/notifications",
"auth": {
"username": "mqtt_user",
"password": "mqtt_pass"
}
},
"extract_rule": "{\"user_id\": \"$.user\", \"message\": \"$.content\", \"priority\": \"$.priority\", \"created_at\": \"$.time\"}",
"build_rule": "{\"notification\": {\"to\": \"$user_id\", \"message\": \"$message\", \"level\": \"$priority\"}, \"timestamp\": \"$created_at\"}",
"interval": 0,
"enabled": true
}
示例7:MQTT到Redis
创建一个从MQTT订阅数据并发布到Redis通道的规则:
{
"id": "mqtt-to-redis",
"name": "MQTT数据转Redis",
"input_config": {
"source_type": "MQTT",
"address": "tcp://mqtt-broker:1883",
"topic": "devices/+/telemetry",
"auth": {
"username": "mqtt_user",
"password": "mqtt_pass"
}
},
"output_config": {
"target_type": "REDIS",
"address": "redis://redis-server:6379",
"topic": "device-telemetry",
"auth": {
"password": "redis_password"
}
},
"extract_rule": "{\"device_id\": \"$.device\", \"metrics\": \"$.telemetry\", \"battery\": \"$.battery\", \"timestamp\": \"$.time\"}",
"build_rule": "{\"id\": \"$device_id\", \"data\": \"$metrics\", \"battery_level\": \"$battery\", \"reported_at\": \"$timestamp\"}",
"interval": 0,
"enabled": true
}
高级功能
使用JavaScript表达式
规则引擎支持在提取规则和构建规则中使用JavaScript表达式进行数据转换:
{
"extract_rule": "{\"celsius\": \"$.temperature\", \"humidity\": \"$.humidity\"}",
"build_rule": "{\"temperature_f\": \"$celsius * 1.8 + 32\", \"humidity_percent\": \"$humidity * 100\", \"status\": \"$celsius > 30 ? 'hot' : ($celsius < 10 ? 'cold' : 'moderate')\"}"
}
数据批处理
对于需要批量处理的场景,可以配置批处理规则:
{
"input_config": {
"source_type": "HTTPAPI",
"address": "https://api.example.com/batch-data",
"batch_size": 100,
"batch_timeout": 30
},
"extract_rule": "{\"items\": \"$.data[*]\", \"timestamp\": \"$.meta.timestamp\"}",
"build_rule": "{\"processed_items\": \"$items.map(item => ({id: item.id, value: item.value * 2}))\", \"batch_time\": \"$timestamp\", \"count\": \"$items.length\"}"
}
条件路由
根据数据内容条件性地选择不同的输出目标:
{
"id": "conditional-routing",
"name": "条件路由示例",
"input_config": {
"source_type": "MQTT",
"address": "tcp://mqtt.example.com:1883",
"topic": "devices/+/data"
},
"output_config": {
"target_type": "HTTPAPI",
"address": "https://api.example.com/data",
"conditional_routes": [
{
"condition": "$device_type == 'temperature'",
"address": "https://api.example.com/temperature"
},
{
"condition": "$device_type == 'humidity'",
"address": "https://api.example.com/humidity"
}
]
},
"extract_rule": "{\"device_id\": \"$.device_id\", \"device_type\": \"$.type\", \"value\": \"$.value\"}",
"build_rule": "{\"id\": \"$device_id\", \"reading\": \"$value\", \"timestamp\": \"new Date().toISOString()\"}",
"enabled": true
}
项目结构
rule-engine/
├── api/ # API接口层
│ └── handler.go # API处理器
├── cmd/ # 命令行工具
│ └── server/ # 服务器入口
│ └── main.go # 主程序入口
├── internal/ # 内部包
│ ├── adapter/ # 协议适配器
│ │ ├── adapter.go # 适配器接口定义
│ │ ├── http_adapter.go # HTTP适配器
│ │ ├── mqtt_adapter.go # MQTT适配器
│ │ ├── websocket_adapter.go # WebSocket适配器
│ │ ├── nats_adapter.go # NATS适配器
│ │ └── redis_adapter.go # Redis适配器
│ ├── config/ # 配置管理
│ │ └── config.go # 配置加载和管理
│ ├── engine/ # 规则引擎核心
│ │ ├── engine.go # 规则引擎实现
│ │ ├── engine_interface.go # 规则引擎接口
│ │ └── processor.go # 数据处理器
│ ├── middleware/ # 中间件
│ │ ├── logger.go # 日志中间件
│ │ ├── recovery.go # 恢复中间件
│ │ └── error_handler.go # 错误处理中间件
│ ├── model/ # 数据模型
│ │ └── rule.go # 规则模型定义
│ ├── repository/ # 数据仓库
│ │ └── rule_repository.go # 规则仓库
│ ├── service/ # 业务服务
│ │ ├── rule_service.go # 规则服务
│ │ └── diagnostic_service.go # 诊断服务
│ └── validator/ # 数据验证
│ └── rule_validator.go # 规则验证器
├── pkg/ # 公共包
│ ├── logger/ # 日志工具
│ │ └── logger.go # 日志实现
│ ├── utils/ # 工具函数
│ │ ├── json.go # JSON工具
│ │ └── template.go # 模板工具
│ └── trace/ # 分布式追踪
│ └── tracer.go # 追踪实现
├── web/ # Web界面
│ └── dist/ # 编译后的前端资源
├── rules/ # 规则存储目录
├── logs/ # 日志文件目录
├── .air.toml # Air热重载配置
├── .gitignore # Git忽略配置
├── config.yaml # 配置文件
├── Dockerfile # Docker构建文件
├── docker-compose.yml # Docker Compose配置
├── go.mod # Go模块定义
├── go.sum # 依赖版本锁定
├── Makefile # 构建脚本
└── README.md # 项目说明
性能优化
规则引擎设计了多项性能优化机制:
- 对象池:减少频繁的内存分配和GC压力
- 连接池:复用网络连接,减少连接建立的开销
- 批处理机制:支持数据批量处理,减少网络交互
- 并发处理:充分利用Go语言的goroutine特性
- 缓存机制:缓存规则解析结果,提高执行效率
可观测性
分布式追踪
项目集成了OpenTelemetry分布式追踪支持,可以追踪:
- HTTP请求处理链路
- 规则执行流程
- 数据源连接和数据获取
- 数据处理和转换
- 数据推送到目标系统
支持多种追踪后端:
- Jaeger
- OpenTelemetry Collector
- 标准输出(用于调试)
性能指标
使用Prometheus收集关键指标:
- 规则执行次数和耗时
- 消息处理量和队列状态
- 错误计数和类型统计
- 连接池状态监控
- 系统资源使用情况
- GC统计信息
部署指南
独立部署
# 构建二进制文件
go build -o rule-engine cmd/server/main.go
# 创建配置和规则目录
mkdir -p rules logs
# 运行服务
./rule-engine --config=config.yaml
Docker部署
# 构建Docker镜像
docker build -t rule-engine .
# 运行容器
docker run -d --name rule-engine \
-p 8080:8080 \
-v $(pwd)/config.yaml:/app/config.yaml \
-v $(pwd)/rules:/app/rules \
-v $(pwd)/logs:/app/logs \
rule-engine
Kubernetes部署
创建ConfigMap和Deployment:
apiVersion: v1
kind: ConfigMap
metadata:
name: rule-engine-config
data:
config.yaml: |
server:
port: 8080
host: 0.0.0.0
# 其他配置...
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rule-engine
spec:
replicas: 1
selector:
matchLabels:
app: rule-engine
template:
metadata:
labels:
app: rule-engine
spec:
containers:
- name: rule-engine
image: rule-engine:latest
ports:
- containerPort: 8080
volumeMounts:
- name: config
mountPath: /app/config.yaml
subPath: config.yaml
- name: rules
mountPath: /app/rules
- name: logs
mountPath: /app/logs
volumes:
- name: config
configMap:
name: rule-engine-config
- name: rules
persistentVolumeClaim:
claimName: rule-engine-rules
- name: logs
persistentVolumeClaim:
claimName: rule-engine-logs
故障排查
常见问题
-
规则无法启动
- 检查规则配置是否正确
- 检查数据源连接是否可用
- 查看日志中的错误信息
-
数据未能成功转换
- 验证提取规则是否匹配输入数据结构
- 使用测试API验证规则执行过程
- 检查数据格式是否符合预期
-
性能问题
- 调整批处理参数
- 优化规则执行间隔
- 检查网络连接和系统资源使用情况
日志分析
日志文件默认位于 ./logs/rule-engine.log,支持不同的日志级别:
- DEBUG:详细调试信息
- INFO:一般操作信息
- WARN:警告信息
- ERROR:错误信息
- FATAL:致命错误
诊断命令
使用API进行系统诊断:
# 获取系统状态
curl http://localhost:8080/api/status
# 获取规则状态
curl http://localhost:8080/api/rules/{id}
# 诊断规则
curl -X POST http://localhost:8080/api/rules/{id}/diagnose
许可证
MIT
