Эх сурвалжийг харах

后管-系统公告模块与websocket公告推送

qianxb 11 сар өмнө
parent
commit
c5772ca00b
26 өөрчлөгдсөн 1069 нэмэгдсэн , 61 устгасан
  1. 27 0
      blade-common/src/main/java/org/springblade/common/constant/WebsocketMsgConstant.java
  2. 5 3
      blade-service-api/blade-business-api/src/main/java/org/springblade/business/feign/BusinessWebSocketClient.java
  3. 5 0
      blade-service-api/blade-manager-api/pom.xml
  4. 90 0
      blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/entity/SystemMsg.java
  5. 5 3
      blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/feign/ManagerWebSocketClient.java
  6. 39 0
      blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/vo/SystemAwaitMsgVO.java
  7. 19 0
      blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/vo/SystemMsgVO.java
  8. 11 1
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/feign/WebSocketClient.java
  9. 5 1
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/MsgVO.java
  10. 26 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/SystMsgVO.java
  11. 3 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserInfoVO.java
  12. 42 0
      blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserSingleVO.java
  13. 7 8
      blade-service/blade-business/src/main/java/org/springblade/business/feignClient/BusinessWebSocketClientImpl.java
  14. 185 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/controller/SystemMsgController.java
  15. 10 6
      blade-service/blade-manager/src/main/java/org/springblade/manager/feign/ManagerWebSocketClientImpl.java
  16. 88 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/job/SystemMsgJob.java
  17. 25 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/mapper/SystemMsgMapper.java
  18. 40 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/mapper/SystemMsgMapper.xml
  19. 27 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/service/ISystemMsgService.java
  20. 151 0
      blade-service/blade-manager/src/main/java/org/springblade/manager/service/impl/SystemMsgServiceImpl.java
  21. 2 0
      blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterWebSocketClientImpl.java
  22. 2 1
      blade-service/blade-meter/src/main/java/org/springblade/meter/service/impl/TaskRepealMessageServiceImpl.java
  23. 15 0
      blade-service/blade-websocket/pom.xml
  24. 42 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/config/TaskPoolConfig.java
  25. 6 0
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/feign/WebSocketClientImpl.java
  26. 192 38
      blade-service/blade-websocket/src/main/java/org/springblade/websocket/service/WebSocketService.java

+ 27 - 0
blade-common/src/main/java/org/springblade/common/constant/WebsocketMsgConstant.java

@@ -0,0 +1,27 @@
+package org.springblade.common.constant;
+
+/**
+ * @Param   推送系统消息时所使用的标识
+ * @Author wangwl
+ * @Date 2024/9/9 9:30
+ **/
+public interface WebsocketMsgConstant {
+
+    /**
+     * 平台统一标识
+     */
+    //维护发布公告
+    String MSG_UPDATE_MSG = "msgUpdateMsg";
+    //系统普通公告
+    String MSG_SYSTEM_MSG = "msgSystemMsg";
+    //维护倒计时
+    String MSG_COUNT_DOWN = "msgCountDown";
+    //客户端从服务器获取公告
+    String GET_MSG = "getMsg";
+
+    /**
+     * 单个系统标识
+     */
+    //计量-消息提醒
+    String MSG_REMIND = "msgRemind";
+}

+ 5 - 3
blade-service-api/blade-business-api/src/main/java/org/springblade/business/feign/BusinessWebSocketClient.java

@@ -2,9 +2,12 @@ package org.springblade.business.feign;
 
 
 import org.springblade.common.constant.BusinessConstant;
+import org.springblade.websocket.vo.UserInfoVO;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.stereotype.Component;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.Map;
@@ -13,7 +16,6 @@ import java.util.Map;
 @Component
 public interface BusinessWebSocketClient {
 
-    @GetMapping(value = "/business/getWebsocketMsg")
-    Map<String, String> getWebsocketMsg(@RequestParam String projectId, @RequestParam String contractId, @RequestParam String userIdResult);
-
+    @PostMapping(value = "/meter/pushMsg")
+    void pushMsg(@RequestBody UserInfoVO vo);
 }

+ 5 - 0
blade-service-api/blade-manager-api/pom.xml

@@ -13,6 +13,11 @@
     <name>${project.artifactId}</name>
     <version>${bladex.project.version}</version>
     <dependencies>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-websocket-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>

+ 90 - 0
blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/entity/SystemMsg.java

@@ -0,0 +1,90 @@
+package org.springblade.manager.entity;
+
+import com.baomidou.mybatisplus.annotation.FieldFill;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.springblade.core.mp.base.BaseEntity;
+import org.springframework.format.annotation.DateTimeFormat;
+
+import javax.validation.constraints.NotNull;
+import java.time.LocalDateTime;
+
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/29 16:07
+ **/
+@Data
+@TableName("m_system_msg")
+@EqualsAndHashCode(callSuper = true)
+public class SystemMsg extends BaseEntity {
+
+    @ApiModelProperty(value = "公告类型:1维护公告2普通公告(会在推送倒计时时,临时借位为3)")
+    @NotNull(message = "请传入公告类型!")
+    private Integer msgType;
+
+    @DateTimeFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss"
+    )
+    @JsonFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss"
+    )
+    @ApiModelProperty(value = "发布时间")
+    @NotNull(message = "请传入发布时间!")
+    private LocalDateTime pushDateTime;
+
+    @ApiModelProperty(value = "更新服务类型字符串拼接:1前端2后端")
+    private String updateServerType;
+
+    @ApiModelProperty(value = "更新代码类型字符串拼接:1需求2BUG")
+    private String updateCodeType;
+
+    @ApiModelProperty(value = "推送系统:系统名称逗号拼接")
+    @NotNull(message = "请传入推送系统!")
+    private String pushSystem;
+
+    @ApiModelProperty(value = "发布内容")
+    private String msgContent;
+
+    @ApiModelProperty(value = "发布备注")
+    private String pushRemark;
+
+    @ApiModelProperty(value = "创建人名称")
+    private String creatUserName;
+
+    @ApiModelProperty(value = "公告显示时间分钟(用于普通公告)")
+    private Integer msgShowTime;
+
+    @ApiModelProperty(value = "公告结束显示时间(用于普通公告)")
+    private LocalDateTime pushEndDateTime;
+
+    @ApiModelProperty(value = "发布状态:1待发布,2已发布,3已取消")
+    private Integer pushStatus;
+
+    @DateTimeFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss"
+    )
+    @JsonFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss"
+    )
+    @ApiModelProperty(value = "取消时间")
+    @TableField(fill = FieldFill.INSERT_UPDATE)
+    private LocalDateTime cancelDateTime;
+
+    @ApiModelProperty(value = "公告提醒分钟(用于维护公告)")
+    private Integer msgWarnTime;
+
+    @ApiModelProperty(value = "公告开始提醒时间(用于维护公告)")
+    private LocalDateTime pushWarnDateTime;
+
+    @ApiModelProperty(value = "公告倒计时分钟(用于维护公告)")
+    private Integer msgCountDownTime;
+
+    @ApiModelProperty(value = "公告开始倒计时时间(用于维护公告)")
+    private LocalDateTime pushCountDownDateTime;
+}

+ 5 - 3
blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/feign/ManagerWebSocketClient.java

@@ -1,9 +1,12 @@
 package org.springblade.manager.feign;
 
 import org.springblade.common.constant.LauncherConstant;
+import org.springblade.websocket.vo.UserInfoVO;
 import org.springframework.cloud.openfeign.FeignClient;
 import org.springframework.stereotype.Component;
 import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 
 import java.util.Map;
@@ -12,7 +15,6 @@ import java.util.Map;
 @Component
 public interface ManagerWebSocketClient {
 
-    @GetMapping(value = "/manager/getWebsocketMsg")
-    Map<String, String> getWebsocketMsg(@RequestParam String projectId, @RequestParam String contractId, @RequestParam String userIdResult);
-
+    @PostMapping(value = "/meter/pushMsg")
+    void pushMsg(@RequestBody UserInfoVO vo);
 }

+ 39 - 0
blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/vo/SystemAwaitMsgVO.java

@@ -0,0 +1,39 @@
+package org.springblade.manager.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.springblade.manager.entity.SystemMsg;
+
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * @Param   系统待发布消息VO
+ * @Author wangwl
+ * @Date 2024/8/29 16:47
+ **/
+@Data
+public class SystemAwaitMsgVO {
+
+    @ApiModelProperty(value = "发布公告")
+    private List<MsgInfo> updateMsg;
+
+    @ApiModelProperty(value = "普通公告")
+    private List<MsgInfo> systemMsg;
+
+    @ApiModelProperty(value = "待发布数量")
+    private Integer msgTotal;
+
+    @Data
+    public static class MsgInfo{
+        @ApiModelProperty(value = "待发布数量")
+        private Integer AwaitMsgTotal;
+
+        @ApiModelProperty(value = "创建人名称")
+        private String creatUserName;
+
+        @ApiModelProperty(value = "发布时间")
+        private LocalDateTime pushDateTime;
+    }
+}

+ 19 - 0
blade-service-api/blade-manager-api/src/main/java/org/springblade/manager/vo/SystemMsgVO.java

@@ -0,0 +1,19 @@
+package org.springblade.manager.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.springblade.manager.entity.SystemMsg;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/29 16:47
+ **/
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class SystemMsgVO extends SystemMsg {
+
+    @ApiModelProperty(value = "公告类型名称")
+    private String msgTypeName;
+}

+ 11 - 1
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/feign/WebSocketClient.java

@@ -1,9 +1,9 @@
 package org.springblade.websocket.feign;
 
 import org.springblade.common.constant.LauncherConstant;
+import org.springblade.websocket.vo.SystMsgVO;
 import org.springblade.websocket.vo.UserInfoVO;
 import org.springframework.cloud.openfeign.FeignClient;
-import org.springframework.stereotype.Component;
 import org.springframework.web.bind.annotation.*;
 
 /**
@@ -13,6 +13,16 @@ import org.springframework.web.bind.annotation.*;
  **/
 @FeignClient(value = LauncherConstant.APPLICATION_WEBSOCKET_NAME)
 public interface WebSocketClient {
+    /**
+     * 推送一个用户的消息
+     * @param vo
+     */
     @PostMapping(value = "/socket/sendMsg")
     void sendMsg(@RequestBody UserInfoVO vo);
+
+    /**
+     * 推送系统公告
+     */
+    @PostMapping(value = "/socket/sendSystemMsg")
+    void sendSystemMsg(@RequestBody SystMsgVO vo);
 }

+ 5 - 1
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/MsgVO.java

@@ -14,9 +14,13 @@ import java.io.Serializable;
 public class MsgVO<T> implements Serializable {
 
     /**
-     * 每个系统独立拥有消息类型
+     * 每个系统独立拥有客户端消息类型
+     *  全平台通用:msgUpdateMsg(维护发布公告),msgSystemMsg(系统普通公告),msgCountDown(维护倒计时)
      *  计量:msgRemind(消息提醒)
      *  档案:
+     *
+     * 服务器处理的消息类型
+     *  getMsg:代表客户端向服务器获取所有公告
      */
     @ApiModelProperty("消息类型")
     private String type;

+ 26 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/SystMsgVO.java

@@ -0,0 +1,26 @@
+package org.springblade.websocket.vo;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/30 14:02
+ **/
+@Data
+public class SystMsgVO implements Serializable {
+
+    @ApiModelProperty(value = "公告类型:1维护公告2普通公告")
+    private Integer msgType;
+
+    @ApiModelProperty(value = "推送系统:系统名称逗号拼接")
+    private String pushSystem;
+
+    @ApiModelProperty(value = "发布内容")
+    private String msgContent;
+
+
+}

+ 3 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserInfoVO.java

@@ -29,6 +29,9 @@ public class UserInfoVO implements Serializable {
     @ApiModelProperty("msg")
     private String msg;
 
+    @ApiModelProperty("公告类型1维护2普通,拼接字符串")
+    private String msgType;
+
     public UserInfoVO() {
     }
 

+ 42 - 0
blade-service-api/blade-websocket-api/src/main/java/org/springblade/websocket/vo/UserSingleVO.java

@@ -0,0 +1,42 @@
+package org.springblade.websocket.vo;
+
+import lombok.Data;
+
+import java.util.Objects;
+
+/**
+ * @Param   用于推送时只推送一次
+ * @Author wangwl
+ * @Date 2024/9/4 14:09
+ **/
+@Data
+public class UserSingleVO {
+
+    private Long userId;
+
+    private String userIp;
+
+    private String system;
+
+    public UserSingleVO() {
+    }
+
+    public UserSingleVO(Long userId, String userIp, String system) {
+        this.userId = userId;
+        this.userIp = userIp;
+        this.system = system;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        UserSingleVO singleVO = (UserSingleVO) o;
+        return Objects.equals(userId, singleVO.userId) && Objects.equals(userIp, singleVO.userIp) && Objects.equals(system, singleVO.system);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(userId, userIp, system);
+    }
+}

+ 7 - 8
blade-service/blade-business/src/main/java/org/springblade/business/feignClient/BusinessWebSocketClientImpl.java

@@ -1,25 +1,24 @@
 package org.springblade.business.feignClient;
 
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import org.springblade.business.feign.BusinessWebSocketClient;
 import org.springblade.business.service.ITaskService;
 import org.springblade.core.tenant.annotation.NonDS;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Map;
 
 @RestController
-@AllArgsConstructor
-@NonDS
+@RequiredArgsConstructor
 public class BusinessWebSocketClientImpl implements BusinessWebSocketClient {
 
-    private final ITaskService iTaskService;
-
     @Override
-    @GetMapping(value = "/business/getWebsocketMsg")
-    public Map<String, String> getWebsocketMsg(String projectId, String contractId, String userIdResult) {
-        return iTaskService.getTaskCount(projectId, contractId, userIdResult);
-    }
+    @Async
+    public void pushMsg(UserInfoVO vo) {
 
+    }
 }

+ 185 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/controller/SystemMsgController.java

@@ -0,0 +1,185 @@
+package org.springblade.manager.controller;
+
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
+import io.swagger.annotations.*;
+import lombok.AllArgsConstructor;
+import org.springblade.core.boot.ctrl.BladeController;
+import org.springblade.core.log.exception.ServiceException;
+import org.springblade.core.mp.support.Query;
+import org.springblade.core.secure.utils.AuthUtil;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.utils.Func;
+import org.springblade.manager.entity.AppVersion;
+import org.springblade.manager.entity.SystemMsg;
+import org.springblade.manager.service.IAppVersionService;
+import org.springblade.manager.service.ISystemMsgService;
+import org.springblade.manager.vo.SystemAwaitMsgVO;
+import org.springblade.manager.vo.SystemMsgVO;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.web.bind.annotation.*;
+
+import javax.validation.Valid;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoField;
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+@RestController
+@AllArgsConstructor
+@RequestMapping("/systemMsg")
+@Api(value = "后管系统公告", tags = "后管系统公告")
+public class SystemMsgController extends BladeController {
+
+    private final ISystemMsgService systemMsgService;
+
+    /**
+     * 新增
+     */
+    @PostMapping("/add")
+    @ApiOperationSupport(order = 1)
+    @ApiOperation(value = "新增", notes = "传入版本信息")
+    public R add(@Valid @RequestBody SystemMsg systemMsg){
+        systemMsg.setPushDateTime(systemMsg.getPushDateTime().with(ChronoField.SECOND_OF_MINUTE, 0));
+        //如果新增的是更新公告,则校验是否已经存在更新公告
+        if (systemMsg.getMsgType() == 1) {
+            List<SystemMsg> list = systemMsgService.list(new LambdaQueryWrapper<SystemMsg>()
+                    .eq(SystemMsg::getMsgType, 1)
+                    .eq(SystemMsg::getPushStatus,1)
+                    .gt(SystemMsg::getPushDateTime, LocalDateTime.now()));
+            if (list.size() > 0){
+                throw new ServiceException("当前已存在待发布的更新公告,请检查后再重新发布");
+            }
+            if (systemMsg.getMsgWarnTime() == null){
+                throw new ServiceException("请填写提醒时间");
+            }
+            if (systemMsg.getMsgCountDownTime() == null){
+                throw new ServiceException("请填写倒计时时间");
+            }
+            //更新公告,计算提醒开始时间,倒计时开始时间
+            systemMsg.setPushWarnDateTime(systemMsg.getPushDateTime().minusMinutes(systemMsg.getMsgWarnTime()));
+            systemMsg.setPushCountDownDateTime(systemMsg.getPushDateTime().minusMinutes(systemMsg.getMsgCountDownTime()));
+        }else {
+            //普通公告,则计算推送结束时间
+            if (systemMsg.getMsgShowTime() == null){
+                throw new ServiceException("请填写停留时间");
+            }
+            systemMsg.setPushEndDateTime(systemMsg.getPushDateTime().plusMinutes(systemMsg.getMsgShowTime()));
+        }
+        //校验公告时间不能小于当前
+        if (systemMsg.getPushDateTime().isBefore(LocalDateTime.now())){
+            throw new ServiceException("发布时间不能小于当前,请检查后再重新发布");
+        }
+        systemMsg.setCreatUserName(AuthUtil.getUserName());
+        systemMsgService.save(systemMsg);
+        return R.data("新增成功");
+    }
+
+    /**
+     * 分页
+     */
+    @GetMapping("/page")
+    @ApiOperationSupport(order = 2)
+    @ApiOperation(value = "分页", notes = "传入分页信息")
+    @ApiImplicitParams(value = {
+            @ApiImplicitParam(name = "current", value = "当前页", required = true),
+            @ApiImplicitParam(name = "size", value = "每页的数量", required = true),
+            @ApiImplicitParam(name = "msgType", value = "公告类型:1维护公告2普通公告", required = false),
+            @ApiImplicitParam(name = "pushStatus", value = "发布状态:1待发布,2已发布,3已取消", required = false)
+    })
+    public R<IPage<SystemMsgVO>> page(Query query, Integer msgType, Integer pushStatus){
+        IPage<SystemMsgVO> page = systemMsgService.page2(query,msgType,pushStatus);
+        return R.data(page);
+    }
+
+    /**
+     * 修改
+     */
+    @PostMapping("/update")
+    @ApiOperationSupport(order = 3)
+    @ApiOperation(value = "修改", notes = "传入版本信息")
+    public R update(@Valid @RequestBody SystemMsg systemMsg){
+        systemMsg.setPushDateTime(systemMsg.getPushDateTime().with(ChronoField.SECOND_OF_MINUTE, 0));
+        if (systemMsg.getMsgType() == 1) {
+            //如果修改的是更新公告,则校验是否已经存在更新公告
+            List<SystemMsg> list = systemMsgService.list(new LambdaQueryWrapper<SystemMsg>()
+                    .eq(SystemMsg::getMsgType, 1)
+                    .ne(SystemMsg::getId, systemMsg.getId())
+                    .gt(SystemMsg::getPushDateTime, LocalDateTime.now()));
+            if (list.size() > 0){
+                throw new ServiceException("当前已存在待发布的更新公告,请检查后再重新编辑发布");
+            }
+            if (systemMsg.getMsgWarnTime() == null){
+                throw new ServiceException("请填写提醒时间");
+            }
+            if (systemMsg.getMsgCountDownTime() == null){
+                throw new ServiceException("请填写倒计时时间");
+            }
+            //更新公告,计算提醒开始时间,倒计时开始时间
+            systemMsg.setPushWarnDateTime(systemMsg.getPushDateTime().minusMinutes(systemMsg.getMsgWarnTime()));
+            systemMsg.setPushCountDownDateTime(systemMsg.getPushDateTime().minusMinutes(systemMsg.getMsgCountDownTime()));
+        }else {
+            //普通公告,计算推送结束时间
+            if (systemMsg.getMsgShowTime() == null){
+                throw new ServiceException("请填写停留时间");
+            }
+            systemMsg.setPushEndDateTime(systemMsg.getPushDateTime().plusMinutes(systemMsg.getMsgShowTime()));
+        }
+        //校验公告时间不能小于当前
+        if (systemMsg.getPushDateTime().isBefore(LocalDateTime.now())){
+            throw new ServiceException("发布时间不能小于当前,请检查后再重新发布");
+        }
+        systemMsg.setCreatUserName(AuthUtil.getUserName());
+        systemMsg.setPushStatus(1);
+        systemMsg.setCancelDateTime(null);
+        systemMsgService.updateById(systemMsg);
+        return R.data("修改成功");
+    }
+
+    /**
+     * 取消发布
+     */
+    @GetMapping("/cancelPush")
+    @ApiOperationSupport(order = 4)
+    @ApiOperation(value = "取消发布", notes = "传入id")
+    public R cancelPush(@RequestParam Long id){
+        SystemMsg msg = systemMsgService.getById(id);
+        msg.setMsgContent(null);
+        systemMsgService.update(new LambdaUpdateWrapper<SystemMsg>()
+                .eq(SystemMsg::getId,id)
+                .set(SystemMsg::getPushStatus,3)
+                .set(SystemMsg::getCancelDateTime,LocalDateTime.now()));
+        systemMsgService.pushSystemMsgToAllUser(msg);
+        return R.data("修改成功");
+    }
+
+    /**
+     * 获取待发布公告
+     */
+    @GetMapping("/getAwaitMsg")
+    @ApiOperationSupport(order = 5)
+    @ApiOperation(value = "获取待发布公告", notes = "没有参数,返回两个数组:发布公告和系统公告")
+    public R<SystemAwaitMsgVO> getAwaitMsg(){
+        SystemAwaitMsgVO vo = systemMsgService.getAwaitMsg();
+        return R.data(vo);
+    }
+
+    /**
+     * 批量删除
+     */
+    @GetMapping("/remove")
+    @ApiOperationSupport(order = 7)
+    @ApiOperation(value = "批量删除", notes = "传入id逗号拼接")
+    public R remove(@RequestParam String ids) {
+        return R.status(systemMsgService.deleteLogic(Func.toLongList(ids)));
+    }
+
+
+}

+ 10 - 6
blade-service/blade-manager/src/main/java/org/springblade/manager/feign/ManagerWebSocketClientImpl.java

@@ -1,24 +1,28 @@
 package org.springblade.manager.feign;
 
 import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import org.springblade.business.feign.OpinionUserClient;
 import org.springblade.core.tenant.annotation.NonDS;
+import org.springblade.manager.service.ISystemMsgService;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Map;
 
 @RestController
-@AllArgsConstructor
-@NonDS
+@RequiredArgsConstructor
 public class ManagerWebSocketClientImpl implements ManagerWebSocketClient {
 
-    private final OpinionUserClient opinionUserClient;
+    private final ISystemMsgService systemMsgService;
 
     @Override
-    @GetMapping(value = "/manager/getWebsocketMsg")
-    public Map<String, String> getWebsocketMsg(String projectId, String contractId, String userIdResult) {
-        return opinionUserClient.getWebsocketMsg(projectId, contractId, userIdResult);
+    @Async
+    public void pushMsg(UserInfoVO vo) {
+        //推送系统公告
+        systemMsgService.pushSystemMsg(vo);
     }
 
 }

+ 88 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/job/SystemMsgJob.java

@@ -0,0 +1,88 @@
+package org.springblade.manager.job;
+
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.manager.entity.SystemMsg;
+import org.springblade.manager.service.ISystemMsgService;
+import org.springblade.websocket.vo.MsgVO;
+import org.springblade.websocket.vo.SystMsgVO;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Param   系统消息定时器管理类
+ * @Author wangwl
+ * @Date 2024/9/5 10:18
+ **/
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class SystemMsgJob {
+
+    private final ISystemMsgService systemMsgService;
+
+    private final JdbcTemplate jdbcTemplate;
+
+    /**
+     *  定时推送公告-取消公告-修改公告状态,不想污染日志,使用jdbc
+     */
+    @Scheduled(cron = "0 */1 * * * ?")
+    public void autoUpdateMsgStatus(){
+        /** 普通公告推送*/
+        String sql1 = "select * from m_system_msg where is_deleted = 0 and msg_type = 2 and push_status = 1 and push_date_time <= TIMESTAMPADD(SECOND, 1, NOW()) and push_end_date_time >= now()";
+        List<SystemMsg> list1 = jdbcTemplate.query(sql1,new BeanPropertyRowMapper<>(SystemMsg.class));
+        if (list1.size() > 0){
+            for (SystemMsg msg : list1) {
+                systemMsgService.pushSystemMsgToAllUser(msg);
+            }
+            systemMsgService.update(new LambdaUpdateWrapper<SystemMsg>()
+                    .set(SystemMsg::getPushStatus,2)
+                    .in(SystemMsg::getId,list1.stream().map(SystemMsg::getId).collect(Collectors.toList())));
+        }
+        /** 维护公告推送*/
+        String sql2 = "select * from m_system_msg where is_deleted = 0 and msg_type = 1 and status = 1 and push_status = 1 and push_date_time >= now() and push_warn_date_time <= TIMESTAMPADD(SECOND, 1, NOW())";
+        List<SystemMsg> list2 = jdbcTemplate.query(sql2,new BeanPropertyRowMapper<>(SystemMsg.class));
+        if (list2.size() > 0){
+            for (SystemMsg msg : list2) {
+                systemMsgService.pushSystemMsgToAllUser(msg);
+            }
+            systemMsgService.update(new LambdaUpdateWrapper<SystemMsg>()
+                    .set(SystemMsg::getStatus,2)
+                    .in(SystemMsg::getId,list2.stream().map(SystemMsg::getId).collect(Collectors.toList())));
+        }
+        /** 维护倒计时*/
+        String sql3 = "select * from m_system_msg where is_deleted = 0 and msg_type = 1 and status != 3 and TIMESTAMPADD(SECOND, 1, NOW()) >= push_count_down_date_time and now() < push_date_time ";
+        List<SystemMsg> list3 = jdbcTemplate.query(sql3,new BeanPropertyRowMapper<>(SystemMsg.class));
+        if (list3.size() > 0){
+            SystemMsg msg = list3.get(0);
+            msg.setMsgType(3);
+            msg.setMsgContent(Duration.between(LocalDateTime.now(), msg.getPushDateTime()).getSeconds()+"");
+            systemMsgService.pushSystemMsgToAllUser(msg);
+            systemMsgService.update(new LambdaUpdateWrapper<SystemMsg>()
+                    .set(SystemMsg::getStatus,3)
+                    .eq(SystemMsg::getId,msg.getId()));
+        }
+
+        /** 维护公告修改状态*/
+        //搜索出发布时间超过当前时间的待发布公告
+        String sql4 = "select * from m_system_msg where is_deleted = 0 and msg_type = 1 and push_status = 1 and push_date_time < now()";
+        List<SystemMsg> list4 = jdbcTemplate.query(sql4,new BeanPropertyRowMapper<>(SystemMsg.class));
+        //如果有数据,则修改为已发布
+        if (list4.size() > 0){
+            systemMsgService.update(new LambdaUpdateWrapper<SystemMsg>()
+                    .set(SystemMsg::getPushStatus,2)
+                    .in(SystemMsg::getId,list4.stream().map(SystemMsg::getId).collect(Collectors.toList())));
+        }
+    }
+}

+ 25 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/mapper/SystemMsgMapper.java

@@ -0,0 +1,25 @@
+package org.springblade.manager.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.apache.ibatis.annotations.Param;
+import org.springblade.manager.entity.SystemMsg;
+import org.springblade.manager.vo.SystemMsgVO;
+
+import java.util.List;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/29 16:20
+ **/
+public interface SystemMsgMapper extends BaseMapper<SystemMsg> {
+    IPage<SystemMsgVO> page(IPage<SystemMsgVO> iPage, @Param("msgType") Integer msgType, @Param("pushStatus") Integer pushStatus);
+
+    List<SystemMsg> getAwaitMsg();
+
+
+    List<SystemMsg> getAwaitSystemMsg(@Param("system") String system);
+
+    SystemMsg getAwaitUpdateMsg(@Param("system") String system);
+}

+ 40 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/mapper/SystemMsgMapper.xml

@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.springblade.manager.mapper.SystemMsgMapper">
+
+
+    <select id="page" resultType="org.springblade.manager.vo.SystemMsgVO">
+        select *,
+               if(msg_type = 1,"系统发布公告","系统普通公告") as msgTypeName
+        from m_system_msg
+        where is_deleted = 0
+        <if test="msgType != null">
+            and msg_type = #{msgType}
+        </if>
+        <if test="pushStatus != null">
+            and push_status = #{pushStatus}
+        </if>
+        order by push_date_time desc
+    </select>
+    <select id="getAwaitMsg" resultType="org.springblade.manager.entity.SystemMsg">
+        select *
+        from m_system_msg
+        where is_deleted = 0 and push_status = 1
+        order by push_date_time desc
+    </select>
+    <select id="getAwaitSystemMsg" resultType="org.springblade.manager.entity.SystemMsg">
+        select *
+        from m_system_msg
+        where is_deleted = 0 and push_system like concat('%',#{system},'%')
+          and msg_type = 2 and push_date_time &lt;= now() and push_end_date_time &gt;= now()
+        order by push_date_time desc
+    </select>
+    <select id="getAwaitUpdateMsg" resultType="org.springblade.manager.entity.SystemMsg">
+        select *
+        from m_system_msg
+        where is_deleted = 0 and push_status = 1 and push_system like concat('%',#{system},'%')
+          and msg_type = 1 and push_date_time &gt;= now() and push_warn_date_time &lt;= now()
+        order by push_date_time desc
+            limit 1
+    </select>
+</mapper>

+ 27 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/service/ISystemMsgService.java

@@ -0,0 +1,27 @@
+package org.springblade.manager.service;
+
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import org.springblade.core.mp.base.BaseService;
+import org.springblade.core.mp.support.Query;
+import org.springblade.manager.entity.SystemMsg;
+import org.springblade.manager.vo.SystemAwaitMsgVO;
+import org.springblade.manager.vo.SystemMsgVO;
+import org.springblade.websocket.vo.UserInfoVO;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/29 16:23
+ **/
+public interface ISystemMsgService extends BaseService<SystemMsg> {
+    IPage<SystemMsgVO> page2(Query query, Integer msgType, Integer pushStatus);
+
+    SystemAwaitMsgVO getAwaitMsg();
+
+    //用户登录推送系统消息
+    void pushSystemMsg(UserInfoVO vo);
+
+    //后管发布时推送给选择系统的所有用户
+    void pushSystemMsgToAllUser(SystemMsg SystemMsg);
+
+}

+ 151 - 0
blade-service/blade-manager/src/main/java/org/springblade/manager/service/impl/SystemMsgServiceImpl.java

@@ -0,0 +1,151 @@
+package org.springblade.manager.service.impl;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import lombok.AllArgsConstructor;
+import org.springblade.common.constant.WebsocketMsgConstant;
+import org.springblade.core.mp.base.BaseServiceImpl;
+import org.springblade.core.mp.support.Query;
+import org.springblade.core.tool.api.R;
+import org.springblade.core.tool.jackson.JsonUtil;
+import org.springblade.manager.entity.SystemMsg;
+import org.springblade.manager.mapper.SystemMsgMapper;
+import org.springblade.manager.service.ISystemMsgService;
+import org.springblade.manager.vo.SystemAwaitMsgVO;
+import org.springblade.manager.vo.SystemMsgVO;
+import org.springblade.meter.entity.TaskRepealMessage;
+import org.springblade.websocket.feign.WebSocketClient;
+import org.springblade.websocket.vo.MsgVO;
+import org.springblade.websocket.vo.SystMsgVO;
+import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Param
+ * @Author wangwl
+ * @Date 2024/8/29 16:25
+ **/
+@Service
+@AllArgsConstructor
+public class SystemMsgServiceImpl extends BaseServiceImpl<SystemMsgMapper, SystemMsg> implements ISystemMsgService {
+
+    private final WebSocketClient webSocketClient;
+
+    private final JdbcTemplate jdbcTemplate;
+    @Override
+    public IPage<SystemMsgVO> page2(Query query, Integer msgType, Integer pushStatus) {
+        IPage<SystemMsgVO> iPage = new Page<>(query.getCurrent(), query.getSize());
+        iPage = baseMapper.page(iPage, msgType, pushStatus);
+        return iPage;
+    }
+
+    @Override
+    public SystemAwaitMsgVO getAwaitMsg() {
+        //获取出系统中所有待发布的公告
+        List<SystemMsg> list = baseMapper.getAwaitMsg();
+        SystemAwaitMsgVO vo = new SystemAwaitMsgVO();
+        List<SystemAwaitMsgVO.MsgInfo> updateMsg = new ArrayList<>();
+        List<SystemAwaitMsgVO.MsgInfo> systemMsg = new ArrayList<>();
+        updateMsg.add(setMsgInfo(list.stream().filter(l -> l.getMsgType() == 1 && l.getUpdateServerType().contains("1")).collect(Collectors.toList())));
+        updateMsg.add(setMsgInfo(list.stream().filter(l -> l.getMsgType() == 1 && l.getUpdateServerType().contains("2")).collect(Collectors.toList())));
+        systemMsg.add(setMsgInfo(list.stream().filter(l -> l.getMsgType() == 2).collect(Collectors.toList())));
+        vo.setUpdateMsg(updateMsg);
+        vo.setSystemMsg(systemMsg);
+        vo.setMsgTotal(list.size());
+        return vo;
+    }
+
+    /**
+     * 推送系统消息
+     * @param vo
+     */
+    @Override
+    @Async
+    public void pushSystemMsg(UserInfoVO vo) {
+        try {
+            Thread.sleep(10L);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        String msgType = vo.getMsgType();
+        if (msgType.contains("2")) {
+            //获取当前系统普通公告
+            List<SystemMsg> systemMsgs = baseMapper.getAwaitSystemMsg(vo.getSystem());
+            if (systemMsgs.size() > 0) {
+                for (SystemMsg systemMsg : systemMsgs) {
+                    MsgVO<String> msgVO = new MsgVO<>();
+                    msgVO.setType(WebsocketMsgConstant.MSG_SYSTEM_MSG);
+                    msgVO.setData(systemMsg.getMsgContent());
+                    String msg = JsonUtil.toJson(R.data(msgVO));
+                    vo.setMsg(msg);
+                    webSocketClient.sendMsg(vo);
+                }
+            }
+        }
+        if (msgType.contains("1")) {
+            //获取当前系统维护公告
+            SystemMsg updateMsg = baseMapper.getAwaitUpdateMsg(vo.getSystem());
+            if (updateMsg != null) {
+                MsgVO<String> msgVO = new MsgVO<>();
+                msgVO.setType(WebsocketMsgConstant.MSG_UPDATE_MSG);
+                msgVO.setData(updateMsg.getMsgContent());
+                String msg = JsonUtil.toJson(R.data(msgVO));
+                vo.setMsg(msg);
+                webSocketClient.sendMsg(vo);
+            }
+        }
+        /** 查看是否正在倒计时*/
+        /** 维护倒计时*/
+        if (msgType.contains("3")) {
+            String sql3 = "select * from m_system_msg where is_deleted = 0 and msg_type = 1 and TIMESTAMPADD(SECOND, 1, NOW()) >= push_count_down_date_time and now() < push_date_time ";
+            List<SystemMsg> list3 = jdbcTemplate.query(sql3, new BeanPropertyRowMapper<>(SystemMsg.class));
+            if (list3.size() > 0) {
+                SystemMsg countDownMsg = list3.get(0);
+                MsgVO<String> msgVO = new MsgVO<>();
+                msgVO.setType(WebsocketMsgConstant.MSG_COUNT_DOWN);
+                msgVO.setData(Duration.between(LocalDateTime.now(), countDownMsg.getPushDateTime()).getSeconds() + "");
+                String msg = JsonUtil.toJson(R.data(msgVO));
+                vo.setMsg(msg);
+                webSocketClient.sendMsg(vo);
+            }
+        }
+
+    }
+
+    /**
+     * 后管发布时推送给选择系统的所有用户
+     * @param msg
+     */
+    @Override
+    @Async
+    public void pushSystemMsgToAllUser(SystemMsg msg) {
+        SystMsgVO systMsgVO = new SystMsgVO();
+        systMsgVO.setMsgType(msg.getMsgType());
+        systMsgVO.setPushSystem(msg.getPushSystem());
+        systMsgVO.setMsgContent(msg.getMsgContent());
+        webSocketClient.sendSystemMsg(systMsgVO);
+    }
+
+    private SystemAwaitMsgVO.MsgInfo setMsgInfo(List<SystemMsg> list) {
+        SystemAwaitMsgVO.MsgInfo msgInfo = new SystemAwaitMsgVO.MsgInfo();
+        if (list.size() > 0) {
+            msgInfo.setAwaitMsgTotal(1);
+            msgInfo.setCreatUserName(list.get(0).getCreatUserName());
+            msgInfo.setPushDateTime(list.get(0).getPushDateTime());
+        } else {
+            msgInfo.setAwaitMsgTotal(0);
+        }
+        return msgInfo;
+    }
+}

+ 2 - 0
blade-service/blade-meter/src/main/java/org/springblade/meter/feign/MeterWebSocketClientImpl.java

@@ -3,6 +3,7 @@ package org.springblade.meter.feign;
 import lombok.AllArgsConstructor;
 import org.springblade.meter.service.ITaskRepealMessageService;
 import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.web.bind.annotation.RestController;
 
 
@@ -12,6 +13,7 @@ public class MeterWebSocketClientImpl implements MeterWebSocketClient {
 
     private final ITaskRepealMessageService repealMessageService;
     @Override
+    @Async
     public void pushMsg(UserInfoVO vo) {
         //推送废除通知
         repealMessageService.pushUnreadMessage(vo);

+ 2 - 1
blade-service/blade-meter/src/main/java/org/springblade/meter/service/impl/TaskRepealMessageServiceImpl.java

@@ -21,6 +21,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import lombok.AllArgsConstructor;
 import org.apache.commons.lang.StringUtils;
+import org.springblade.common.constant.WebsocketMsgConstant;
 import org.springblade.core.log.exception.ServiceException;
 import org.springblade.core.mp.base.BaseServiceImpl;
 import org.springblade.core.secure.utils.AuthUtil;
@@ -113,7 +114,7 @@ public class TaskRepealMessageServiceImpl extends BaseServiceImpl<TaskRepealMess
                 //1为未读,2为已读
                 .eq(TaskRepealMessage::getStatus, 1));
         MsgVO<Integer> msgVO = new MsgVO<>();
-        msgVO.setType("msgRemind");
+        msgVO.setType(WebsocketMsgConstant.MSG_REMIND);
         msgVO.setData(Math.toIntExact(count));
         String msg = JsonUtil.toJson(R.data(msgVO));
         vo.setMsg(msg);

+ 15 - 0
blade-service/blade-websocket/pom.xml

@@ -25,6 +25,21 @@
             <artifactId>blade-meter-api</artifactId>
             <version>${bladex.project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-archive-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-manager-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springblade</groupId>
+            <artifactId>blade-business-api</artifactId>
+            <version>${bladex.project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.springblade</groupId>
             <artifactId>blade-core-boot</artifactId>

+ 42 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/config/TaskPoolConfig.java

@@ -0,0 +1,42 @@
+package org.springblade.websocket.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+
+/**
+ * 线程池参数配置
+ **/
+@EnableAsync
+@Configuration
+public class TaskPoolConfig {
+
+    /**
+     * 自定义线程池
+     **/
+    @Bean("WebsocketExecutor")
+    public Executor taskExecutor() {
+        //返回可用处理器的Java虚拟机的数量 12
+        int i = Runtime.getRuntime().availableProcessors();
+        System.out.println("系统最大线程数  : " + i);
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        //核心线程池大小
+        executor.setCorePoolSize(5);
+        //最大线程数
+        executor.setMaxPoolSize(10);
+        //配置队列容量,默认值为Integer.MAX_VALUE
+        executor.setQueueCapacity(20);
+        // 设置线程活跃时间(当超过了核心线程出的核心线程的存活时间,并且没有任务)
+        executor.setKeepAliveSeconds(60);
+        //线程名字前缀
+        executor.setThreadNamePrefix("websocketExecutor -");
+        //设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行
+//        executor.setAwaitTerminationSeconds(60);
+        //等待所有的任务结束后再关闭线程池
+//        executor.setWaitForTasksToCompleteOnShutdown(true);
+        return executor;
+    }
+}

+ 6 - 0
blade-service/blade-websocket/src/main/java/org/springblade/websocket/feign/WebSocketClientImpl.java

@@ -2,6 +2,7 @@ package org.springblade.websocket.feign;
 
 import lombok.AllArgsConstructor;
 import org.springblade.websocket.service.WebSocketService;
+import org.springblade.websocket.vo.SystMsgVO;
 import org.springblade.websocket.vo.UserInfoVO;
 import org.springframework.web.bind.annotation.RestController;
 
@@ -19,4 +20,9 @@ public class WebSocketClientImpl implements WebSocketClient{
     public void sendMsg(UserInfoVO vo) {
         socketService.sendMessage(vo);
     }
+
+    @Override
+    public void sendSystemMsg(SystMsgVO vo) {
+        socketService.sendSystemMsg(vo);
+    }
 }

+ 192 - 38
blade-service/blade-websocket/src/main/java/org/springblade/websocket/service/WebSocketService.java

@@ -1,72 +1,114 @@
 package org.springblade.websocket.service;
 
+import io.undertow.websockets.jsr.UndertowSession;
 import lombok.Data;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springblade.business.feign.BusinessWebSocketClient;
 import org.springblade.common.constant.ClientIdConstant;
+import org.springblade.common.constant.WebsocketMsgConstant;
 import org.springblade.core.tool.api.R;
 import org.springblade.core.tool.jackson.JsonUtil;
-import org.springblade.core.tool.utils.SpringUtil;
+import org.springblade.feign.ArchiveWebSocketClient;
+import org.springblade.manager.feign.ManagerWebSocketClient;
 import org.springblade.meter.feign.MeterWebSocketClient;
 import org.springblade.websocket.entity.WebSocketClientInfo;
 import org.springblade.websocket.vo.MsgVO;
+import org.springblade.websocket.vo.SystMsgVO;
 import org.springblade.websocket.vo.UserInfoVO;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Component;
-
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 
 @ServerEndpoint(value = "/websocket/{system}/{projectId}/{contractId}/{userId}")
 @Component
 @Data
-public class WebSocketService {
+public class WebSocketService implements ApplicationContextAware {
+
+    @Autowired
+    @Qualifier("WebsocketExecutor")
+    private Executor WebsocketExecutor;
 
-    private static MeterWebSocketClient meterWebSocketClient = null;
+    /** 计量*/
+    private static MeterWebSocketClient meterWebSocketClient;
+    /** 质检*/
+    private static BusinessWebSocketClient businessWebSocketClient;
+    /** 档案*/
+    private static ArchiveWebSocketClient archiveWebSocketClient;
+    /** 后台*/
+    private static ManagerWebSocketClient managerWebSocketClient;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        WebSocketService.meterWebSocketClient = applicationContext.getBean(MeterWebSocketClient.class);
+        WebSocketService.businessWebSocketClient = applicationContext.getBean(BusinessWebSocketClient.class);
+        WebSocketService.archiveWebSocketClient = applicationContext.getBean(ArchiveWebSocketClient.class);
+        WebSocketService.managerWebSocketClient = applicationContext.getBean(ManagerWebSocketClient.class);
+    }
 
     private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
 
     /**静态变量,用来记录当前在线用户数。应该把它设计成线程安全的。*/
     private static int onlineUserCount = 0;
+
     /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
     private static int onlineLinkCount = 0;
+
     /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象*/
     private static ConcurrentHashMap<Long, Map<String, WebSocketClientInfo>> webSocketMap = new ConcurrentHashMap<>();
+
     /**concurrent包的线程安全Set,用来存放每个用户所在合同对应的session对象*/
-    private static ConcurrentHashMap<UserInfoVO, List<Session>> userInfoMap = new ConcurrentHashMap<>();
+    private static ConcurrentHashMap<UserInfoVO, CopyOnWriteArrayList<Session>> userInfoMap = new ConcurrentHashMap<>();
+
+    /** 计量系统session集合*/
+    private static Set<Session> measureSystemSessions = new CopyOnWriteArraySet<>();
+    /** 质检系统session集合*/
+    private static Set<Session> clientSystemSessions = new CopyOnWriteArraySet<>();
+    /** 档案系统session集合*/
+    private static Set<Session> archivesSystemSessions = new CopyOnWriteArraySet<>();
+
+//    /** 更新公告推送记录*/
+//    private static Set<UserSingleVO> updateMsgPushUsers = new HashSet<>();
+//    /** 普通公告推送记录*/
+//    private static Set<UserSingleVO> systemMsgPushUsers = new HashSet<>();
 
     /**锁的映射表*/
     private static final ConcurrentHashMap<Long, ReentrantLock> locks = new ConcurrentHashMap<>();
 
     /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
     private Session session;
-    /**接收userId*/
+    /**用户id*/
     private Long userId;
+    /**用户ip*/
+    private String userIp;
+    /**系统名称*/
+    private String system;
     /**用户系统项目合同信息*/
     private UserInfoVO vo;
 
-
     /**
      * 连接建立成功调用的方法
      */
     @OnOpen
     public void onOpen(Session session, @PathParam("system") String system, @PathParam("projectId") Long projectId,
                        @PathParam("contractId") Long contractId,@PathParam("userId") Long userId) {
-        if (meterWebSocketClient == null){
-            meterWebSocketClient = SpringUtil.getBean(MeterWebSocketClient.class);
-        }
         ReentrantLock lock = getLock(userId);
         lock.lock();
         this.session = session;
+        UndertowSession undertowSession = (UndertowSession) session;
+        this.userIp = undertowSession.getWebSocketChannel().getSourceAddress().getAddress().getHostAddress();
         this.userId= userId;
+        this.system = system;
         //用户的连接信息
         WebSocketClientInfo client = new WebSocketClientInfo();
         client.setSession(session);
@@ -83,25 +125,39 @@ public class WebSocketService {
         userMap.put(session.getId(),client);
         webSocketMap.put(userId, userMap);
         addOnlineLinkCount();
-        log.info("--------------------------------建立连接-----------------------------------------");
+        log.info("----------------------建立连接-------------------------------");
         log.info("用户连接:"+userId+",当前用户总连接数:"+userMap.size());
         log.info("当前在线人数为:" + getOnlineUserCount()+",当前总连接数:"+getOnlineLinkCount());
         try {
             sendMessage(JsonUtil.toJson(R.data(new MsgVO("msgLink","来自后台的反馈:连接成功"))));
             this.vo = new UserInfoVO(system,projectId,contractId,userId);
-//            int randomNumber = (int)(Math.random() * 11);
-//            Thread.sleep(randomNumber);
-            List<Session> sessions = userInfoMap.get(vo);
+            CopyOnWriteArrayList<Session> sessions = userInfoMap.get(vo);
             if (sessions == null){
-                sessions = new ArrayList<>();
+                sessions = new CopyOnWriteArrayList<>();
             }
             sessions.add(session);
             userInfoMap.put(vo,sessions);
+            /** 获取倒计时,如果当前有倒计时则推送*/
+            this.vo.setMsgType("3");
+            try {
+                managerWebSocketClient.pushMsg(vo);
+            }catch (Exception e){
+                log.info("用户:"+userId+",网络异常:"+e.getMessage());
+            }
             /** 向指定系统发送通知,如果当前用户有需要推送的消息则推送 */
             switch (system) {
                 case ClientIdConstant.METER_ID:
+                    measureSystemSessions.add(session);
                     meterWebSocketClient.pushMsg(vo);
                     break;
+                case ClientIdConstant.ARCHIVE_ID:
+                    archivesSystemSessions.add(session);
+                    break;
+                case ClientIdConstant.CLIENT_ID:
+                    clientSystemSessions.add(session);
+                    break;
+                default:
+                    log.error("未知的系统登录:"+system);
             }
         } catch (Exception e) {
             log.error("用户:"+userId+",网络异常:"+e.getMessage());
@@ -115,18 +171,27 @@ public class WebSocketService {
      */
     @OnClose
     public void onClose() {
-        //同一个userId,只能有一个线程操作
-//        try {
-//            Thread.sleep(10);
-//        } catch (InterruptedException e) {
-//            throw new RuntimeException(e);
-//        }
+        //同一个userId,只能有一个线程同时操作
         ReentrantLock lock = getLock(userId);
         lock.lock();
         try {
+            //先删除系统中存储的session
+            switch (system) {
+                case ClientIdConstant.METER_ID:
+                    measureSystemSessions.remove(session);
+                    break;
+                case ClientIdConstant.CLIENT_ID:
+                    clientSystemSessions.remove(session);
+                    break;
+                case ClientIdConstant.ARCHIVE_ID:
+                    archivesSystemSessions.remove(session);
+                    break;
+                default:
+                    log.error("未知的系统退出:"+system);
+            }
             Map<String, WebSocketClientInfo> userMap = webSocketMap.get(userId);
             if (userMap != null && userMap.size() > 0) {
-                log.info("--------------------------------断开连接---------------------------------------");
+                log.info("----------------------断开连接-----------------------------");
                 List<Session> sessions = userInfoMap.get(vo);
                 if (sessions != null && sessions.size() > 0){
                     sessions.remove(session);
@@ -157,10 +222,15 @@ public class WebSocketService {
     @OnMessage
     public void onMessage(String message, Session session) {
         log.info("收到用户消息:"+userId+",报文:"+message);
-        //可以群发消息
         //消息保存到数据库、redis
-        if(StringUtils.isNotBlank(message)){
-
+        if(WebsocketMsgConstant.GET_MSG.equals(message)){
+            /** 向系统公告发送通知,如果当前有公告则推送*/
+            try {
+                this.vo.setMsgType("12");
+                managerWebSocketClient.pushMsg(vo);
+            }catch (Exception e){
+                log.info("用户:"+userId+",网络异常:"+e.getMessage());
+            }
         }
     }
 
@@ -183,21 +253,29 @@ public class WebSocketService {
         }
     }
 
+    /**
+     *  推送指定的系统的所有在线用户
+     */
+    public void sendMessage(String msg,String system) throws Exception {
+        synchronized (session){
+            this.session.getBasicRemote().sendText(msg);
+        }
+    }
+
     /**
      * 向指定客户端推送指定用户消息
      */
     public void sendMessage(UserInfoVO vo2){
             List<Session> sessions = userInfoMap.get(vo2);
             if(sessions != null && sessions.size() > 0){
-                /** 此处不能使用迭代器,否则在推送消息时,当前用户又开启一个页面,集合发生变化迭代器会报错ConcurrentModificationException*/
-                /** 如果数组越界,则复制一份出来循环发送*/
-                for (int i = 0; i < sessions.size(); i++) {
-                    Session s = sessions.get(i);
+                /** 此处不能使用普通集合,否则在推送消息时,当前用户又开启一个页面,集合发生变化迭代器会报错ConcurrentModificationException*/
+                for (Session s : sessions) {
                     if (s == null){
                         continue;
                     }
                     RemoteEndpoint.Basic basicRemote = s.getBasicRemote();
-                    /** 推送消息时可能因为nginx配置的连接时间导致通道关闭,此时跳过推送给这个窗口*/
+                    /** 推送消息时可能因为nginx配置的连接时间,或者用户刚好断开连接,
+                     * 导致通道关闭,但是数组中使用的是快照,还存在这个连接,此时跳过推送给这个窗口*/
                     if (basicRemote != null){
                         try {
                             basicRemote.sendText(vo2.getMsg());
@@ -209,6 +287,84 @@ public class WebSocketService {
             }
     }
 
+    /**
+     * 推送系统公告
+     * @param vo
+     */
+    public void sendSystemMsg(SystMsgVO vo) {
+        log.info("------------------公告推送开始-------------------------");
+        Integer meterTotal = measureSystemSessions.size();
+        Integer clientTotal = clientSystemSessions.size();
+        Integer archivesTotal = archivesSystemSessions.size();
+        String pushSystem = vo.getPushSystem();
+        Integer msgType = vo.getMsgType();
+//        if (msgType == 1){
+//            updateMsgPushUsers.clear();
+//        }else {
+//            systemMsgPushUsers.clear();
+//        }
+        //处理消息类型
+        String msgTypeValue;
+        if (msgType == 1){
+            msgTypeValue = WebsocketMsgConstant.MSG_UPDATE_MSG;
+        }else if (msgType == 2){
+            msgTypeValue = WebsocketMsgConstant.MSG_SYSTEM_MSG;
+        }else {
+            msgTypeValue = WebsocketMsgConstant.MSG_COUNT_DOWN;
+        }
+        //提前转换消息格式
+        String msg = JsonUtil.toJson(R.data(new MsgVO(msgTypeValue,vo.getMsgContent())));
+        //异步推送各个系统,最后汇总统计
+        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(
+                () -> sendAssignSystemMsg(pushSystem.contains(ClientIdConstant.METER_ID) ? measureSystemSessions : new HashSet<>(),msg), WebsocketExecutor);
+        CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(
+                () -> sendAssignSystemMsg(pushSystem.contains(ClientIdConstant.CLIENT_ID) ? clientSystemSessions : new HashSet<>(),msg), WebsocketExecutor);
+        CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(
+                () -> sendAssignSystemMsg(pushSystem.contains(ClientIdConstant.ARCHIVE_ID) ? archivesSystemSessions : new HashSet<>(),msg), WebsocketExecutor);
+
+        try {
+            CompletableFuture.allOf(cf1, cf2,cf3).get(10, TimeUnit.SECONDS);
+            String pushInfo = "推送系统{"+pushSystem+"}";
+            String pushResult = "推送结果"+"{计量在线"+meterTotal+"推送"+cf1.get()+",质检在线"+clientTotal+"推送"+cf2.get()+",档案在线"+archivesTotal+"推送"+cf3.get()+"}";
+            log.info(pushInfo);
+            log.info(pushResult);
+            log.info("------------------公告推送结束-------------------------");
+        } catch (TimeoutException e) {
+            log.error("推送公告超时,原因:"+e.getMessage());
+        } catch (Exception e){
+            log.error("推送公告报错,原因:"+e.getMessage());
+        }
+    }
+
+    /**
+     * 单个系统推送方法
+     */
+    public Integer sendAssignSystemMsg(Set<Session> sessions,String msg){
+        Integer total = 0;
+        if (sessions.size() > 0){
+            for (Session s : sessions) {
+                if (s == null){
+                    continue;
+                }
+                RemoteEndpoint.Basic basicRemote = s.getBasicRemote();
+                /** 推送消息时可能因为nginx配置的连接时间,或者用户刚好断开连接,
+                 * 导致通道关闭,但是数组中使用的是快照,还存在这个连接,此时跳过推送给这个窗口*/
+                if (basicRemote != null){
+                    try {
+                        basicRemote.sendText(msg);
+                        total++;
+                    } catch (IOException e) {
+                        log.error("推送公告失败,原因:"+e.getMessage());
+                    }
+                }
+            }
+        }
+        return total;
+    }
+
+
+
+
     /**
      *      获取锁的方法,如果锁不存在,则创建一个新的锁
       */
@@ -245,6 +401,4 @@ public class WebSocketService {
         WebSocketService.onlineLinkCount--;
     }
 
-
-
 }