Spring Boot 中使用 SseEmitter 实现服务器推送消息

Spring Boot 中使用 SseEmitter 实现服务器推送消息的功能,可以通过维护一个连接池来记录当前连接数。具体步骤如下:

  1. 定义一个 ConcurrentHashMap 用于保存连接池,key 为用户 id 或其它唯一标识符,value 为 SseEmitter 对象。
swiftCopy codeprivate final ConcurrentHashMap<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
  1. 当用户发起 SSE 连接请求时,创建一个 SseEmitter 对象,并将其存入连接池中。在 Spring MVC 中可以通过 @RequestMapping 注解和 ResponseBodyEmitterReturnValueHandler 实现:
lessCopy code@RequestMapping("/connect")
public SseEmitter connect(@RequestParam("userId") String userId) {
    SseEmitter sseEmitter = new SseEmitter();
    sseEmitterMap.put(userId, sseEmitter);
    return sseEmitter;
}
  1. 当连接断开时,从连接池中删除对应的 SseEmitter 对象。可以通过 @ExceptionHandler 注解和 ResponseBodyEmitterReturnValueHandler 实现:
javaCopy code@ExceptionHandler(IOException.class)
public void handleException(IOException ex) {
    // 从连接池中删除对应的 SseEmitter 对象
}
  1. 在需要知道当前连接数的地方,遍历连接池,统计 SseEmitter 对象的数量即可。
pythonCopy codeint connectionCount = sseEmitterMap.size();

需要注意的是,SseEmitter 对象在连接断开时需要从连接池中删除,否则会造成内存泄漏。另外,连接池的实现需要考虑并发情况下的线程安全性。

如果浏览器异常退出或者用户关闭了浏览器,连接池怎么处理

当浏览器异常退出或者用户关闭浏览器时,连接池需要及时将对应的 SseEmitter 对象从连接池中删除,以避免内存泄漏。

在 Spring Boot 中,可以使用 @ExceptionHandler 注解来捕获 SSE 连接异常,并在异常处理方法中进行 SseEmitter 对象的清理操作。具体实现步骤如下:

  1. 在控制器中定义一个 SseEmitter 对象,并将其存入连接池中:
lessCopy code@RequestMapping("/connect")
public SseEmitter connect(@RequestParam("userId") String userId) {
    SseEmitter sseEmitter = new SseEmitter();
    sseEmitterMap.put(userId, sseEmitter);
    return sseEmitter;
}
  1. 定义一个异常处理方法,使用 @ExceptionHandler 注解来指定需要捕获的异常类型(这里选择 IOException),并在方法中将对应的 SseEmitter 对象从连接池中删除:
javaCopy code@ExceptionHandler(IOException.class)
public void handleException(IOException ex) {
    // 遍历连接池,找到对应的 SseEmitter 对象并删除
    sseEmitterMap.values().removeIf(sseEmitter -> sseEmitter.equals(ex.getSseEmitter()));
}
  1. 在 SseEmitter 对象中,捕获客户端连接异常并将异常信息通过 onError 方法发送到客户端:
javascriptCopy codesseEmitter.onError(ex -> {
    // 在异常处理方法中删除对应的 SseEmitter 对象
    ex.printStackTrace();
    ex.setSseEmitter(sseEmitter);
    throw ex;
});

这样,在客户端异常退出或者关闭浏览器时,就会触发 SSE 连接异常,并在异常处理方法中将对应的 SseEmitter 对象从连接池中删除,从而保证连接池中的 SseEmitter 对象与客户端的连接状态保持一致,避免内存泄漏。

定义连接池为什么需要private final

定义连接池时将其定义为 private final 的原因是为了保证连接池的线程安全性和不可变性。

private 关键字表示连接池是私有的,只能在当前类中访问,不能在其他类中直接访问,从而避免了对连接池的直接访问和修改。

final 关键字表示连接池是不可变的,即连接池的引用不能被重新赋值,但是连接池内部的元素可以被修改。这种方式保证了连接池的线程安全性,因为线程安全的集合一般都是不可变的,即集合对象的引用不能被重新赋值,但是集合内部的元素可以被修改。

因此,将连接池定义为 private final 可以避免在多线程环境下对连接池的并发访问和修改,从而保证了连接池的线程安全性和不可变性。