移除默认私钥文件,异步处理报警消息

This commit is contained in:
lin
2026-04-03 17:50:06 +08:00
parent 99d19d0623
commit e51d000588
28 changed files with 538 additions and 579 deletions
+1
View File
@@ -33,3 +33,4 @@ certificates
/docker/wvp/config/jwk.json
/打包/
/snap/
/config/
+6
View File
@@ -411,6 +411,12 @@
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@@ -32,5 +32,7 @@ public class SipConfig {
Integer registerTimeInterval = 120;
private boolean alarm = true;
private long timeout = 1000;
}
@@ -183,7 +183,7 @@ public class UserSetting {
/**
* jwk文件路径,若不指定则使用resources目录下的jwk.json
*/
private String jwkFile = "classpath:jwk.json";
private String jwkFile = null;
/**
* wvp集群模式下如果注册向上级的wvp奔溃,则自动选择一个其他wvp继续注册到上级
@@ -229,6 +229,12 @@ public class UserSetting {
*/
private boolean subscribeMobilePosition = false;
/**
* 处理报警消息时,会缓存通道数据,如果超出则丢弃低热度消息,被丢弃的通道下次使用就需要重新查询数据库,默认10000,
* 建议根据实际情况调整,过大可能会占用较多内存,过小可能会增加数据库查询压力
*/
private long alarmCatchSize = 10000;
}
@@ -104,8 +104,9 @@ public class JwtUtils implements InitializingBean {
}
String jwkFile = userSetting.getJwkFile();
if (jwkFile == null || jwkFile.trim().isEmpty()) {
log.error("[API AUTH] JWK文件路径未配置!");
return createDefaultRsaKey();
log.error("[API AUTH] JWK文件路径未配置!使用默认配置路径:./config/jwk.json");
jwkFile = "config" + File.separator + "jwk.json"; // 默认外部路径
return createAndPersistDefaultRsaKey(jwkFile);
}
// 尝试读取JWK文件(自动处理classpath/本地文件,用try-with-resources自动关流,无泄露)
@@ -131,7 +132,7 @@ public class JwtUtils implements InitializingBean {
log.error("[API AUTH] JWK文件中无有效RSA私钥(仅公钥无法签名JWT)");
} catch (IOException e) {
log.error("[API AUTH] 读取JWK文件失败(路径:{})", jwkFile, e);
log.error("[API AUTH] 读取JWK文件失败(路径:{})", jwkFile);
} catch (Exception e) {
log.error("[API AUTH] 解析JWK文件失败(JSON格式错误或密钥无效)", e);
}
@@ -154,7 +155,7 @@ public class JwtUtils implements InitializingBean {
return resource.getInputStream();
}
// throw new IOException("classpath下JWK文件不存在:" + filePath);
}
}
{
File file = determinePersistPath(jwkFile).toFile();// 外部配置与classpath失败场景下
if (file.exists() && file.canRead()) {
@@ -1,299 +0,0 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.service.bean.AlarmType;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.HashSet;
import java.util.Set;
/**
* @author lin
*/
@Schema(description = "报警信息")
@Data
public class DeviceAlarm {
@Schema(description = "设备的国标编号")
private String deviceId;
@Schema(description = "设备名称")
private String deviceName;
/**
* 通道Id
*/
@Schema(description = "通道的国标编号")
private String channelId;
/**
* 报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情
*/
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriority;
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriorityDescription;
public String getAlarmPriorityDescription() {
switch (alarmPriority) {
case "1":
return "一级警情";
case "2":
return "二级警情";
case "3":
return "三级警情";
case "4":
return "四级警情";
default:
return alarmPriority;
}
}
/**
* 报警方式 , 1为电话报警, 2为设备报警, 3为短信报警, 4为 GPS报警, 5为视频报警, 6为设备故障报警,
* 7其他报警;可以为直接组合如12为电话报警或 设备报警-
*/
@Schema(description = "报警方式 , 1为电话报警, 2为设备报警, 3为短信报警, 4为 GPS报警, 5为视频报警, 6为设备故障报警,\n" +
"\t * 7其他报警;可以为直接组合如12为电话报警或设备报警")
private String alarmMethod;
private String alarmMethodDescription;
public String getAlarmMethodDescription() {
StringBuilder stringBuilder = new StringBuilder();
char[] charArray = alarmMethod.toCharArray();
for (char c : charArray) {
switch (c) {
case '1':
stringBuilder.append("-电话报警");
break;
case '2':
stringBuilder.append("-设备报警");
break;
case '3':
stringBuilder.append("-短信报警");
break;
case '4':
stringBuilder.append("-GPS报警");
break;
case '5':
stringBuilder.append("-视频报警");
break;
case '6':
stringBuilder.append("-设备故障报警");
break;
case '7':
stringBuilder.append("-其他报警");
break;
}
}
stringBuilder.delete(0, 1);
return stringBuilder.toString();
}
/**
* 报警时间
*/
@Schema(description = "报警时间")
private String alarmTime;
/**
* 报警内容描述
*/
@Schema(description = "报警内容描述")
private String alarmDescription;
/**
* 经度
*/
@Schema(description = "经度")
private double longitude;
/**
* 纬度
*/
@Schema(description = "纬度")
private double latitude;
/**
* 报警类型,
* 报警方式为2时,不携带 AlarmType为默认的报警设备报警,
* 携带 AlarmType取值及对应报警类型如下:
* 1-视频丢失报警;
* 2-设备防拆报警;
* 3-存储设备磁盘满报警;
* 4-设备高温报警;
* 5-设备低温报警。
* 报警方式为5时,取值如下:
* 1-人工视频报警;
* 2-运动目标检测报警;
* 3-遗留物检测报警;
* 4-物体移除检测报警;
* 5-绊线检测报警;
* 6-入侵检测报警;
* 7-逆行检测报警;
* 8-徘徊检测报警;
* 9-流量统计报警;
* 10-密度检测报警;
* 11-视频异常检测报警;
* 12-快速移动报警。
* 报警方式为6时,取值下:
* 1-存储设备磁盘故障报警;
* 2-存储设备风扇故障报警。
*/
@Schema(description = "报警类型")
private String alarmType;
public String getAlarmTypeDescription() {
if (alarmType == null) {
return "";
}
char[] charArray = alarmMethod.toCharArray();
Set<String> alarmMethodSet = new HashSet<>();
for (char c : charArray) {
alarmMethodSet.add(Character.toString(c));
}
String result = alarmType;
if (alarmMethodSet.contains("2")) {
switch (alarmType) {
case "1":
result = "视频丢失报警";
break;
case "2":
result = "设备防拆报警";
break;
case "3":
result = "存储设备磁盘满报警";
break;
case "4":
result = "设备高温报警";
break;
case "5":
result = "设备低温报警";
break;
}
}
if (alarmMethodSet.contains("5")) {
switch (alarmType) {
case "1":
result = "人工视频报警";
break;
case "2":
result = "运动目标检测报警";
break;
case "3":
result = "遗留物检测报警";
break;
case "4":
result = "物体移除检测报警";
break;
case "5":
result = "绊线检测报警";
break;
case "6":
result = "入侵检测报警";
break;
case "7":
result = "逆行检测报警";
break;
case "8":
result = "徘徊检测报警";
break;
case "9":
result = "流量统计报警";
break;
case "10":
result = "密度检测报警";
break;
case "11":
result = "视频异常检测报警";
break;
case "12":
result = "快速移动报警";
break;
}
}
if (alarmMethodSet.contains("6")) {
switch (alarmType) {
case "1":
result = "存储设备磁盘故障报警";
break;
case "2":
result = "存储设备风扇故障报警";
break;
}
}
return result;
}
public AlarmType getAlarmTypeEnum() {
if (alarmType == null) {
return null;
}
char[] charArray = alarmMethod.toCharArray();
Set<String> alarmMethodSet = new HashSet<>();
for (char c : charArray) {
alarmMethodSet.add(Character.toString(c));
}
if (alarmMethodSet.contains("2")) {
switch (alarmType) {
case "1":
return AlarmType.VideoLoss;
case "2":
return AlarmType.DeviceTamper;
case "3":
return AlarmType.StorageFull;
case "4":
return AlarmType.DeviceHighTemperature;
case "5":
return AlarmType.DeviceLowTemperature;
}
}
if (alarmMethodSet.contains("5")) {
switch (alarmType) {
case "1":
return AlarmType.ManualVideo;
case "2":
return AlarmType.MotionDetection;
case "3":
return AlarmType.LeftObjectDetection;
case "4":
return AlarmType.ObjectRemovalDetection;
case "5":
return AlarmType.TripwireDetection;
case "6":
return AlarmType.IntrusionDetection;
case "7":
return AlarmType.ReverseDetection;
case "8":
return AlarmType.LoiteringDetection;
case "9":
return AlarmType.FlowStatistics;
case "10":
return AlarmType.DensityDetection;
case "11":
return AlarmType.VideoAbnormal;
case "12":
return AlarmType.RapidMovement;
}
}
if (alarmMethodSet.contains("6")) {
switch (alarmType) {
case "1":
return AlarmType.StorageFault;
case "2":
return AlarmType.StorageFanFault;
}
}
return null;
}
@Schema(description = "报警类型描述")
private String alarmTypeDescription;
@Schema(description = "创建时间")
private String createTime;
}
@@ -0,0 +1,214 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.service.bean.AlarmType;
import com.genersoft.iot.vmp.utils.DateUtil;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.dom4j.Element;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*;
/**
* @author lin
*/
@Schema(description = "报警通知")
@Data
public class DeviceAlarmNotify {
@Schema(description = "设备的国标编号")
private String deviceId;
@Schema(description = "设备名称")
private String deviceName;
/**
* 通道Id
*/
@Schema(description = "通道的国标编号")
private String channelId;
/**
* 报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情
*/
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriority;
@Schema(description = "报警级别, 1为一级警情, 2为二级警情, 3为三级警情, 4为四级警情")
private String alarmPriorityDescription;
/**
* 报警方式 , 1为电话报警, 2为设备报警, 3为短信报警, 4为 GPS报警, 5为视频报警, 6为设备故障报警,
* 7其他报警;可以为直接组合如12为电话报警或 设备报警-
*/
@Schema(description = "报警方式 , 1为电话报警, 2为设备报警, 3为短信报警, 4为 GPS报警, 5为视频报警, 6为设备故障报警,\n" +
"\t * 7其他报警;可以为直接组合如12为电话报警或设备报警")
private Integer alarmMethod;
private String alarmMethodDescription;
/**
* 报警时间
*/
@Schema(description = "报警时间")
private String alarmTime;
/**
* 报警内容描述
*/
@Schema(description = "报警内容描述")
private String alarmDescription;
/**
* 经度
*/
@Schema(description = "经度")
private double longitude;
/**
* 纬度
*/
@Schema(description = "纬度")
private double latitude;
/**
* 报警类型,
* 报警方式为2时,不携带 AlarmType为默认的报警设备报警,
* 携带 AlarmType取值及对应报警类型如下:
* 1-视频丢失报警;
* 2-设备防拆报警;
* 3-存储设备磁盘满报警;
* 4-设备高温报警;
* 5-设备低温报警。
* 报警方式为5时,取值如下:
* 1-人工视频报警;
* 2-运动目标检测报警;
* 3-遗留物检测报警;
* 4-物体移除检测报警;
* 5-绊线检测报警;
* 6-入侵检测报警;
* 7-逆行检测报警;
* 8-徘徊检测报警;
* 9-流量统计报警;
* 10-密度检测报警;
* 11-视频异常检测报警;
* 12-快速移动报警。
* 报警方式为6时,取值下:
* 1-存储设备磁盘故障报警;
* 2-存储设备风扇故障报警。
*/
@Schema(description = "报警类型")
private Integer alarmType;
@Schema(description = "事件类型, 在入侵检测报警时可携带")
private Integer eventType;
public AlarmType getAlarmTypeEnum() {
if (alarmType == null) {
return null;
}
if (alarmMethod == DeviceAlarmMethod.Device.getVal()) {
// 2为设备报警,
// 报警方式为2时,
// 不携带 AlarmType为默认的报警设备报警,
// 携带 AlarmType取值及对应报警类型如下:
// 1-视频丢失报警;2-设备防拆报警;3-存储设备磁盘满报警;4-设备高温报警;5-设备低温报警
switch (alarmType) {
case 1:
return AlarmType.VideoLoss;
case 2:
return AlarmType.DeviceTamper;
case 3:
return AlarmType.StorageFull;
case 4:
return AlarmType.DeviceHighTemperature;
case 5:
return AlarmType.DeviceLowTemperature;
}
}
if (alarmMethod == DeviceAlarmMethod.GPS.getVal()) {
// 5为视频报警
// 报警方式为5时,
// 取值如下:
// 1-人工视频报警;2-运动目标检测报警;3-遗留物检测报警;4-物体移除检测报警;5-绊线检测报警;
// 6-入侵检测报警;7-逆行检测报警;8-徘徊检测报警;9-流量统计报警;
// 10-密度检测报警;11-视频异常检测报警;12-快速移动报警。
switch (alarmType) {
case 1:
return AlarmType.ManualVideo;
case 2:
return AlarmType.MotionDetection;
case 3:
return AlarmType.LeftObjectDetection;
case 4:
return AlarmType.ObjectRemovalDetection;
case 5:
return AlarmType.TripwireDetection;
case 6:
return AlarmType.IntrusionDetection;
case 7:
return AlarmType.ReverseDetection;
case 8:
return AlarmType.LoiteringDetection;
case 9:
return AlarmType.FlowStatistics;
case 10:
return AlarmType.DensityDetection;
case 11:
return AlarmType.VideoAbnormal;
case 12:
return AlarmType.RapidMovement;
}
}
if (alarmMethod == DeviceAlarmMethod.DeviceFailure.getVal()) {
switch (alarmType) {
case 1:
return AlarmType.StorageFault;
case 2:
return AlarmType.StorageFanFault;
}
}
return null;
}
@Schema(description = "报警类型描述")
private String alarmTypeDescription;
@Schema(description = "创建时间")
private String createTime;
public static DeviceAlarmNotify fromXml(Element rootElement) {
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText();
DeviceAlarmNotify deviceAlarm = new DeviceAlarmNotify();
deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setChannelId(channelId);
deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(getInteger(rootElement, "AlarmMethod"));
String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
deviceAlarm.setAlarmDescription(getText(rootElement, "AlarmDescription"));
Double longitude = getDouble(rootElement, "Longitude");
deviceAlarm.setLongitude(longitude != null ? longitude: 0.00D);
Double latitude = getDouble(rootElement, "Latitude");
deviceAlarm.setLatitude(latitude != null ? latitude: 0.00D);
deviceAlarm.setAlarmType(getInteger(rootElement, "AlarmType"));
Element info = rootElement.element("Info");
if (info != null) {
deviceAlarm.setAlarmType(getInteger(info, "AlarmType"));
Element alarmTypeParam = info.element("AlarmTypeParam");
if (alarmTypeParam != null) {
deviceAlarm.setAlarmDescription(alarmTypeParam.elementText("AlarmDescription"));
}
}
deviceAlarm.setCreateTime(DateUtil.getNow());
return deviceAlarm;
}
}
@@ -1,6 +1,6 @@
package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
@@ -19,7 +19,7 @@ public interface DeviceAlarmMapper {
@Insert("INSERT INTO wvp_device_alarm (device_id, channel_id, alarm_priority, alarm_method, alarm_time, alarm_description, longitude, latitude, alarm_type , create_time ) " +
"VALUES (#{deviceId}, #{channelId}, #{alarmPriority}, #{alarmMethod}, #{alarmTime}, #{alarmDescription}, #{longitude}, #{latitude}, #{alarmType}, #{createTime})")
int add(DeviceAlarm alarm);
int add(DeviceAlarmNotify alarm);
@Select( value = {" <script>" +
@@ -34,8 +34,8 @@ public interface DeviceAlarmMapper {
" <if test=\"endTime != null\" > AND alarm_time &lt;= #{endTime} </if>" +
" ORDER BY alarm_time ASC " +
" </script>"})
List<DeviceAlarm> query(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("alarmPriority") String alarmPriority, @Param("alarmMethod") String alarmMethod,
@Param("alarmType") String alarmType, @Param("startTime") String startTime, @Param("endTime") String endTime);
List<DeviceAlarmNotify> query(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("alarmPriority") String alarmPriority, @Param("alarmMethod") String alarmMethod,
@Param("alarmType") String alarmType, @Param("startTime") String startTime, @Param("endTime") String endTime);
@Delete(" <script>" +
@@ -38,11 +38,10 @@ public class EventPublisher {
/**
* 设备报警事件
* @param deviceAlarm
*/
public void deviceAlarmEventPublish(DeviceAlarm deviceAlarm) {
public void deviceAlarmEventPublish(List<DeviceAlarmNotify> deviceAlarmList) {
DeviceAlarmEvent alarmEvent = new DeviceAlarmEvent(this);
alarmEvent.setAlarmInfo(deviceAlarm);
alarmEvent.setDeviceAlarmList(deviceAlarmList);
applicationEventPublisher.publishEvent(alarmEvent);
}
@@ -1,16 +1,20 @@
package com.genersoft.iot.vmp.gb28181.event.alarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
import java.util.List;
/**
* @description: 报警事件
* @author: lawrencehj
* @data: 2021-01-20
*/
@Getter
@Setter
public class DeviceAlarmEvent extends ApplicationEvent {
@Serial
@@ -20,13 +24,6 @@ public class DeviceAlarmEvent extends ApplicationEvent {
super(source);
}
private DeviceAlarm deviceAlarm;
private List<DeviceAlarmNotify> deviceAlarmList;
public DeviceAlarm getAlarmInfo() {
return deviceAlarm;
}
public void setAlarmInfo(DeviceAlarm deviceAlarm) {
this.deviceAlarm = deviceAlarm;
}
}
@@ -21,9 +21,9 @@ public class SseSessionManager {
@Autowired
private DynamicTask dynamicTask;
public SseEmitter conect(String browserId){
public SseEmitter conect(String browserId) {
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onError((err)-> {
sseEmitter.onError((err) -> {
log.error("[SSE推送] 连接错误, 浏览器 ID: {}, {}", browserId, err.getMessage());
sseSessionMap.remove(browserId);
sseEmitter.completeWithError(err);
@@ -48,16 +48,19 @@ public class SseSessionManager {
}
@Scheduled(fixedRate = 1000) //每1秒执行一次
public void execute(){
if (sseSessionMap.isEmpty()){
public void execute() {
if (sseSessionMap.isEmpty()) {
return;
}
sendForAll("keepalive", "alive");
}
@Async
@org.springframework.context.event.EventListener
public void onApplicationEvent(DeviceAlarmEvent event) {
sendForAll("message", event.getAlarmInfo());
event.getDeviceAlarmList().forEach(
notify -> sendForAll("message", notify)
);
}
@@ -66,7 +69,8 @@ public class SseSessionManager {
SseEmitter sseEmitter = sseSessionMap.get(browserId);
if (sseEmitter == null) {
continue;
};
}
;
try {
sseEmitter.send(SseEmitter.event().name(event).data(data));
} catch (Exception e) {
@@ -4,7 +4,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@@ -304,7 +304,7 @@ public interface ISIPCommander {
* @param deviceAlarm 报警信息信息
* @return
*/
void sendAlarmMessage(Device device, DeviceAlarm deviceAlarm) throws InvalidArgumentException, SipException, ParseException;
void sendAlarmMessage(Device device, DeviceAlarmNotify deviceAlarm) throws InvalidArgumentException, SipException, ParseException;
}
@@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -102,7 +101,7 @@ public interface ISIPCommanderForPlatform {
* @param deviceAlarm 报警信息信息
* @return
*/
void sendAlarmMessage(Platform parentPlatform, DeviceAlarm deviceAlarm) throws SipException, InvalidArgumentException, ParseException;
void sendAlarmMessage(Platform parentPlatform, DeviceAlarmNotify deviceAlarm) throws SipException, InvalidArgumentException, ParseException;
/**
* 回复catalog事件-增加/更新
@@ -1408,7 +1408,7 @@ public class SIPCommander implements ISIPCommander {
}
@Override
public void sendAlarmMessage(Device device, DeviceAlarm deviceAlarm) throws InvalidArgumentException, SipException, ParseException {
public void sendAlarmMessage(Device device, DeviceAlarmNotify deviceAlarm) throws InvalidArgumentException, SipException, ParseException {
if (device == null) {
return;
}
@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@@ -15,9 +14,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@@ -396,7 +393,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
@Override
public void sendAlarmMessage(Platform parentPlatform, DeviceAlarm deviceAlarm) throws SipException, InvalidArgumentException, ParseException {
public void sendAlarmMessage(Platform parentPlatform, DeviceAlarmNotify deviceAlarm) throws SipException, InvalidArgumentException, ParseException {
if (parentPlatform == null) {
return;
}
@@ -0,0 +1,141 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* SIP命令类型: NOTIFY请求中的报警通知请求处理
*/
@Slf4j
@Component
public class NotifyRequestForAlarm extends SIPRequestProcessorParent {
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired
private UserSetting userSetting;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IDeviceChannelService deviceChannelService;
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[notify-报警订阅] 待处理消息队列已满 {},返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue());
return;
}
taskQueue.offer(new HandlerCatchData(evt, null, null));
}
@Scheduled(fixedDelay = 400) //每400毫秒执行一次
@Async
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceAlarmNotify> deviceAlarmList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
RequestEvent evt = take.getEvt();
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
return;
}
String channelId = rootElement.elementText("DeviceID");
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
log.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
return;
}
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
return;
}
DeviceAlarmNotify deviceAlarmNotify = DeviceAlarmNotify.fromXml(rootElement);
deviceAlarmNotify.setDeviceId(deviceId);
deviceAlarmNotify.setDeviceName(device.getName());
log.info("[收到Notify-Alarm]{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId());
if (deviceAlarmNotify.getAlarmMethod() != null && deviceAlarmNotify.getAlarmMethod() == DeviceAlarmMethod.GPS.getVal()) { // GPS报警
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel == null) {
log.warn("[解析报警通知] 未找到通道:{}/{}", device.getDeviceId(), channelId);
}else {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(deviceAlarmNotify.getDeviceId());
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
}
}
// 回复200 OK
if (redisCatchStorage.deviceIsOnline(deviceId)) {
deviceAlarmList.add(deviceAlarmNotify);
}
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
}
}
if (deviceAlarmList.isEmpty()) {
return;
}
eventPublisher.deviceAlarmEventPublish(deviceAlarmList);
}
}
@@ -56,10 +56,6 @@ public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent
@Autowired
private IGbChannelService channelService;
// @Scheduled(fixedRate = 2000) //每400毫秒执行一次
// public void showSize(){
// log.warn("[notify-目录订阅] 待处理消息数量: {}", taskQueue.size() );
// }
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
@@ -1,17 +1,10 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
@@ -23,7 +16,6 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
@@ -34,29 +26,20 @@ import java.text.ParseException;
@Component
public class NotifyRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
@Autowired
private SipConfig sipConfig;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private EventPublisher publisher;
private final String method = "NOTIFY";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor;
@Autowired
private NotifyRequestForMobilePositionProcessor notifyRequestForMobilePositionProcessor;
@Autowired
private NotifyRequestForAlarm notifyRequestForAlarm;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
@@ -77,105 +60,16 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
if (CmdType.CATALOG.equals(cmd)) {
notifyRequestForCatalogProcessor.process(evt);
} else if (CmdType.ALARM.equals(cmd)) {
processNotifyAlarm(evt);
notifyRequestForAlarm.process(evt);
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
notifyRequestForMobilePositionProcessor.process(evt);
} else {
log.info("接收到消息:" + cmd);
log.info("[Notify] 收到位置类型消息:{}, \r\n {}", cmd, evt.getRequest());
}
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("未处理的异常 ", e);
} catch (DocumentException e) {
throw new RuntimeException(e);
}
}
/***
* 处理alarm设备报警Notify
*/
private void processNotifyAlarm(RequestEvent evt) {
log.info("[收到Notify-Alarm]{}", evt.getRequest());
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理alarm设备报警Notify时未获取到消息体{}", evt.getRequest());
return;
}
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText().toString();
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
log.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId);
return;
}
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[ NotifyAlarm ] content cannot be null, {}", evt.getRequest());
return;
}
DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setDeviceId(deviceId);
deviceAlarm.setDeviceName(device.getName());
deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
if (alarmTime == null) {
log.warn("[ NotifyAlarm ] AlarmTime cannot be null");
return;
}
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
deviceAlarm.setAlarmDescription("");
} else {
deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
} else {
deviceAlarm.setLongitude(0.00);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
} else {
deviceAlarm.setLatitude(0.00);
}
log.info("[收到Notify-Alarm]{}/{}", device.getDeviceId(), deviceAlarm.getChannelId());
if ("4".equals(deviceAlarm.getAlarmMethod())) { // GPS报警
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel == null) {
log.warn("[解析报警通知] 未找到通道:{}/{}", device.getDeviceId(), channelId);
}else {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
mobilePosition.setTime(deviceAlarm.getAlarmTime());
mobilePosition.setLongitude(deviceAlarm.getLongitude());
mobilePosition.setLatitude(deviceAlarm.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
}
}
// 回复200 OK
if (redisCatchStorage.deviceIsOnline(deviceId)) {
publisher.deviceAlarmEventPublish(deviceAlarm);
}
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
}
}
}
@@ -10,8 +10,6 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
@@ -21,7 +19,6 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@@ -32,8 +29,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 报警事件的处理,参考:9.4
*/
@@ -99,62 +94,34 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceAlarmNotify> deviceAlarmList = new ArrayList<>();
for (SipMsgInfo sipMsgInfo : handlerCatchDataList) {
if (sipMsgInfo == null) {
if (sipMsgInfo == null || sipMsgInfo.getDevice() == null) {
continue;
}
RequestEvent evt = sipMsgInfo.getEvt();
try {
DeviceAlarmNotify deviceAlarmNotify = DeviceAlarmNotify.fromXml(sipMsgInfo.getRootElement());
Device device = sipMsgInfo.getDevice();
System.out.println(device.getHostAddress() + ": " + evt.getRequest());
Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID");
String channelId = deviceIdElement.getText();
DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
deviceAlarm.setDeviceName(sipMsgInfo.getDevice().getName());
deviceAlarm.setChannelId(channelId);
deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority"));
deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod"));
String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime");
if (alarmTime == null) {
continue;
if (log.isDebugEnabled()) {
log.debug("[收到报警通知]设备:{}, 内容:{}", device.getDeviceId(), JSON.toJSONString(deviceAlarmNotify));
}
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription");
if (alarmDescription == null) {
deviceAlarm.setAlarmDescription("");
} else {
deviceAlarm.setAlarmDescription(alarmDescription);
}
String longitude = getText(sipMsgInfo.getRootElement(), "Longitude");
if (longitude != null && NumericUtil.isDouble(longitude)) {
deviceAlarm.setLongitude(Double.parseDouble(longitude));
} else {
deviceAlarm.setLongitude(0.00);
}
String latitude = getText(sipMsgInfo.getRootElement(), "Latitude");
if (latitude != null && NumericUtil.isDouble(latitude)) {
deviceAlarm.setLatitude(Double.parseDouble(latitude));
} else {
deviceAlarm.setLatitude(0.00);
}
if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod()) && deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) {
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
deviceAlarmNotify.setDeviceId(device.getDeviceId());
deviceAlarmNotify.setDeviceName(device.getName());
if (deviceAlarmNotify.getAlarmMethod() != null && deviceAlarmNotify.getAlarmMethod() == DeviceAlarmMethod.GPS.getVal()) {
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), deviceAlarmNotify.getChannelId());
if (deviceChannel == null) {
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId);
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId());
} else {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setTime(deviceAlarm.getAlarmTime());
mobilePosition.setLongitude(deviceAlarm.getLongitude());
mobilePosition.setLatitude(deviceAlarm.getLatitude());
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
// 更新device channel 的经纬度
@@ -165,39 +132,33 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
}
}
if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) {
if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType"));
}
}
if (log.isDebugEnabled()) {
log.debug("[收到报警通知]设备:{}, 内容:{}", device.getDeviceId(), JSON.toJSONString(deviceAlarm));
}
// 作者自用判断,其他小伙伴需要此消息可以自行修改,但是不要提在pr里
if (DeviceAlarmMethod.Other.getVal() == Integer.parseInt(deviceAlarm.getAlarmMethod())) {
if (deviceAlarmNotify.getAlarmMethod() != null
&& DeviceAlarmMethod.Other.getVal() == deviceAlarmNotify.getAlarmMethod()) {
// 发送给平台的报警信息。 发送redis通知
log.info("[发送给平台的报警信息]内容:{}", JSONObject.toJSONString(deviceAlarm));
log.info("[发送给平台的报警信息]内容:{}", JSONObject.toJSONString(deviceAlarmNotify));
AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
if (deviceAlarm.getAlarmMethod() != null) {
alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
}
alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
if (deviceAlarm.getAlarmType() != null) {
alarmChannelMessage.setAlarmType(Integer.parseInt(deviceAlarm.getAlarmType()));
}
alarmChannelMessage.setGbId(channelId);
alarmChannelMessage.setAlarmSn(deviceAlarmNotify.getAlarmMethod());
alarmChannelMessage.setAlarmDescription(deviceAlarmNotify.getAlarmDescription());
alarmChannelMessage.setAlarmType(deviceAlarmNotify.getAlarmType());
alarmChannelMessage.setGbId(deviceAlarmNotify.getChannelId());
redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
continue;
}
if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) {
publisher.deviceAlarmEventPublish(deviceAlarm);
deviceAlarmList.add(deviceAlarmNotify);
}
} catch (Exception e) {
log.error("未处理的异常 ", e);
log.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}", e.getMessage(), evt.getRequest());
}
}
if (deviceAlarmList.isEmpty()) {
return;
}
publisher.deviceAlarmEventPublish(deviceAlarmList);
}
@Override
@@ -211,57 +172,18 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
}
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText();
DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setDeviceId(parentPlatform.getServerGBId());
deviceAlarm.setDeviceName(parentPlatform.getName());
deviceAlarm.setChannelId(channelId);
deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod"));
String alarmTime = XmlUtil.getText(rootElement, "AlarmTime");
if (alarmTime == null) {
return;
}
deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime));
String alarmDescription = getText(rootElement, "AlarmDescription");
if (alarmDescription == null) {
deviceAlarm.setAlarmDescription("");
} else {
deviceAlarm.setAlarmDescription(alarmDescription);
}
String longitude = getText(rootElement, "Longitude");
if (longitude != null && NumericUtil.isDouble(longitude)) {
deviceAlarm.setLongitude(Double.parseDouble(longitude));
} else {
deviceAlarm.setLongitude(0.00);
}
String latitude = getText(rootElement, "Latitude");
if (latitude != null && NumericUtil.isDouble(latitude)) {
deviceAlarm.setLatitude(Double.parseDouble(latitude));
} else {
deviceAlarm.setLatitude(0.00);
}
if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) {
if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) {
deviceAlarm.setAlarmType(getText(rootElement.element("Info"), "AlarmType"));
}
}
DeviceAlarmNotify deviceAlarmNotify = DeviceAlarmNotify.fromXml(rootElement);
deviceAlarmNotify.setDeviceId(parentPlatform.getServerGBId());
deviceAlarmNotify.setDeviceName(parentPlatform.getName());
deviceAlarmNotify.setChannelId(channelId);
if (channelId.equals(parentPlatform.getDeviceGBId())) {
// 发送给平台的报警信息。 发送redis通知
AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage();
if (deviceAlarm.getAlarmMethod() != null) {
alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod()));
}
alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription());
alarmChannelMessage.setAlarmSn(deviceAlarmNotify.getAlarmMethod());
alarmChannelMessage.setAlarmDescription(deviceAlarmNotify.getAlarmDescription());
alarmChannelMessage.setGbId(channelId);
if (deviceAlarm.getAlarmType() != null) {
alarmChannelMessage.setAlarmType(Integer.parseInt(deviceAlarm.getAlarmType()));
}
alarmChannelMessage.setAlarmType(deviceAlarmNotify.getAlarmType());
redisCatchStorage.sendAlarmMsg(alarmChannelMessage);
}
}
@@ -13,7 +13,6 @@ import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
@@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
@@ -1,6 +1,8 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import com.genersoft.iot.vmp.utils.DateUtil;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -34,10 +36,10 @@ public class Alarm {
private String recordPath;
@Schema(description = "报警附带的经度")
private String longitude;
private Double longitude;
@Schema(description = "报警附带的纬度")
private String latitude;
private Double latitude;
@Schema(description = "报警类别")
private AlarmType alarmType;
@@ -45,6 +47,16 @@ public class Alarm {
@Schema(description = "报警时间")
private Long alarmTime;
public static Alarm buildFromDeviceAlarmNotify(DeviceAlarmNotify deviceAlarmNotify) {
Alarm alarm = new Alarm();
alarm.setDescription(deviceAlarmNotify.getAlarmDescription());
alarm.setAlarmType(deviceAlarmNotify.getAlarmTypeEnum());
alarm.setAlarmTime(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
alarm.setLongitude(deviceAlarmNotify.getLongitude());
alarm.setLatitude(deviceAlarmNotify.getLatitude());
return alarm;
}
}
@@ -1,20 +1,31 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.alarm.DeviceAlarmEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.service.IAlarmService;
import com.genersoft.iot.vmp.service.bean.Alarm;
import com.genersoft.iot.vmp.service.bean.AlarmType;
import com.genersoft.iot.vmp.storager.dao.AlarmMapper;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@Service
@RequiredArgsConstructor
@@ -22,16 +33,68 @@ public class AlarmServiceImpl implements IAlarmService {
private final AlarmMapper alarmMapper;
private final UserSetting userSetting;
private final SipConfig sipConfig;
private final IDeviceChannelService deviceChannelService;
// 使用Caffeine缓存设备通道信息,避免频繁查询数据库,提升性能
private Cache<String, DeviceChannel> channelCache = null;
private final ConcurrentLinkedQueue<Alarm> alarmQueue = new ConcurrentLinkedQueue<>();
@PostConstruct
public void init() {
// 初始化Caffeine缓存,设置合理的过期时间和最大容量
channelCache = Caffeine.newBuilder()
.maximumSize(userSetting.getAlarmCatchSize()) // 固定容量
.build();
}
@Async
@EventListener
public void onApplicationEvent(DeviceAlarmEvent event) {
if (channelCache == null || !sipConfig.isAlarm()) {
return;
}
// 处理国标的报警事件,转换为通用的Alarm对象后缓存,在定时任务中批量保存到数据库
Alarm alarm = new Alarm();
alarm.setChannelId(event.getAlarmInfo().getChannelId());
alarm.setChannelId(deviceAlarmEvent.getAlarmInfo().getChannelId());
alarm.setAlarmType(AlarmType.valueOf(deviceAlarmEvent.getAlarmInfo().getAlarmType()));
alarm.setAlarmTime(deviceAlarmEvent.getAlarmInfo().getAlarmTime());
alarm.setAlarmInfo(deviceAlarmEvent.getAlarmInfo().getAlarmInfo());
if (event.getDeviceAlarmList().isEmpty()) {
return;
}
for (DeviceAlarmNotify notify : event.getDeviceAlarmList()) {
Alarm alarm = Alarm.buildFromDeviceAlarmNotify(notify);
String key = notify.getDeviceId() + notify.getChannelId();
DeviceChannel deviceChannel = channelCache.get(key, k -> deviceChannelService.getOneForSource(notify.getDeviceId(), notify.getChannelId()));
if (deviceChannel == null) {
continue;
}
alarm.setChannelId(deviceChannel.getId());
// 分配一个快照路径,后续在去补充快照文件
alarm.setSnapPath("/snap/alarm_" + alarm.getId() + ".jpg");
alarmQueue.offer(alarm);
}
}
@Scheduled(fixedDelay = 500)
public void executeTaskQueue() {
if (alarmQueue.isEmpty()) {
return;
}
List<Alarm> handlerCatchDataList = new ArrayList<>();
int size = alarmQueue.size();
for (int i = 0; i < size; i++) {
Alarm poll = alarmQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
// 批量保存到数据库
alarmMapper.insertAlarms(handlerCatchDataList);
// 异步处理快照的生成和保存,避免影响报警信息的保存效率
}
@@ -4,7 +4,7 @@ import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
@@ -95,12 +95,12 @@ public class RedisAlarmMsgListener implements MessageListener {
}
String chanelId = alarmChannelMessage.getGbId();
DeviceAlarm deviceAlarm = new DeviceAlarm();
DeviceAlarmNotify deviceAlarm = new DeviceAlarmNotify();
deviceAlarm.setCreateTime(DateUtil.getNow());
deviceAlarm.setChannelId(chanelId);
deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription());
deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn());
deviceAlarm.setAlarmType("" + alarmChannelMessage.getAlarmType());
deviceAlarm.setAlarmMethod(alarmChannelMessage.getAlarmSn());
deviceAlarm.setAlarmType(alarmChannelMessage.getAlarmType());
deviceAlarm.setAlarmPriority("1");
deviceAlarm.setAlarmTime(DateUtil.getNow());
deviceAlarm.setLongitude(0);
@@ -42,4 +42,7 @@ public interface AlarmMapper {
"</foreach>" +
"</script>")
void deleteAlarms(@Param("ids") List<Long> ids);
void insertAlarms(List<Alarm> handlerCatchDataList);
}
-1
View File
@@ -1 +0,0 @@
{"keys":[{"kty":"RSA","kid":"3e79646c4dbc408383a9eed09f2b85ae","n":"rThRAlbMRceko3NkymeSoN2ICVaDlNBLWv3cyLUeixjWcmuhnPv2JpXmgoxezKZfhH_0sChBof--BaaqSUukl9wWMW1bWCyFyU5qNczhQk3ANlhaLiSgXsqD-NKI3ObJjB-26fnOZb9QskCqrPW1lEtwgb9-skMAfGlh5kaDOKjYKI64DPSMMXpSiJEDM-7DK-TFfm0QfPcoH-k-1C02NHlGWehVUn9FUJ0TAiDxpKj28qOmYh7s1M7OU_h-Sso7LM-5zbftpcO6SINe81Gw9JPd7rKPCRxkw8ROSCCq-JH_zshM80kTK2nWcseGvhQ_4vKQIBp9PrAgCrGJHM160w","e":"AQAB","d":"AwS2NKo6iQS_k7GREg3X-kGh-zest00h4wYFcOHnFFlsczX47PlfArEeASxdAofrpi1soB0zd5UzRHnxAbH1vkexg076hoDQG__nzeQyEKu2K7xCZgdxW_V_cziH9gF3hZ-P2mfl9tPsng6OatElRt5BqaEingyY15ImiJK1-qi_LTx4gfwRfquKLbUgqJR4Tf6eKlwOzEo41Ilo26gnojNzWryB_XHG7lj6SngPDBJp7ty32je4Fv3A3hXt7JHDwloww6-xiRtUflDpSec4A-o-PHgbfoYLyM7mM4BDt4PM54EHm4u8WzypG0wNKDTiq4KSapei5xDbiG3RpngvAQ","p":"5kUHkGxnZvZT762Ex-0De2nYodAbbZNVR-eIPx2ng2VZmEbAU3cp_DxigpXWyQ0FwJ2Me8GvxnlbxJ7k7d-4AV2X8q6Q-UqXajHdudRU_QX05kPEgZ3xtPk5ekI0-u1BEQT7pY_gxlZC2mzXAcVLd-LwbVPuQEba5S4JMsjcHUE","q":"wJNa06-qZ2tWncGl7cfJdO-SJ_H3taowMhh-RsJmeVefjjN3pfVjjE0wG_rIP-BjjCB9OhvSnI8LDjoNu8uIg090DYnA6IUfZpWo3zjgedeyqQyXFVjjVQkn98zgp5NFLpuitZsl9-EHhh7JaZDCwaJ527MN3VCoQxeI75ggjxM","dp":"HQTH_kBbC5OxYjwIxrUswinFnia-viFaFvSrq-CN0rY8Az-vTxVuWhY2B-TgK3gTqIFyScpP34A9u1qW2Q9fffSQiInNRU1MJZrhKWED0NsmULprkjYYVsktoCWlzZWGpKFvIR8voW8Pf71FnziA2TvlNrHkDX-gaE9T422Cp8E","dq":"owJYqMWS1dYLTKBlx0ANbHl6W2u7xb_Y6h7HjTfzLBWazvEL_6QW7uVLqvN-XGuheDTsK6rvfWyr7BACHgvsc1JnJyqK64f8C4b1mnZ3tUt7RROONBi43ftRJLX9GHxV3F0LvvQkkI2gI8ydq0lJQkU5J1qKiuNCewBJ_p3kOZc","qi":"hNAZV6aWEEWfB1HkrfdtO6sjq9ceEod55ez82I1ZNgoKle8gpRkh3vw2EIJ_5lcw57s5rw8G-sCQPG1AQSZ6u9aURwHkIXjpIhLAlv6gvKkCh0smPPvnSiltJKOJsuHkrD6rGkV1f-MlCS51lKlk9xShQzkRidkNd4BUh0a7ktA"}]}
+3
View File
@@ -265,6 +265,9 @@ user-settings:
# 这允许服务器防止客户端发起的基于 TCP 的拒绝服务攻击(即发起数百个客户端事务)。
# 如果为 true(默认作),则堆栈将保持套接字打开,以便以牺牲线程和内存资源为代价来最大化性能 - 使自身容易受到 DOS 攻击。
sip-cache-server-connections: true
# 处理报警消息时,会缓存通道数据,如果超出则丢弃低热度消息,被丢弃的通道下次使用就需要重新查询数据库,默认10000,
# 建议根据实际情况调整,过大可能会占用较多内存,过小可能会增加数据库查询压力,
alarm-catch-size: 10000
# 关闭在线文档(生产环境建议关闭)
springdoc:
+1
View File
@@ -0,0 +1 @@