Administrator
发布于 2024-10-08 / 8 阅读
0

Springboot 环境下 Websocket简单实现

1.添加依赖

        <!-- Spring Boot WebSocket依赖 -->
<!--        <dependency>-->
<!--            <groupId>org.springframework.boot</groupId>-->
<!--            <artifactId>spring-boot-starter-websocket</artifactId>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>5.3.22</version>
            <scope>compile</scope>
        </dependency>

2.配置和启用websocket 支持

@Configuration
public class WebsocketBeanConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3.配置监听事件及集成用户验证

@Slf4j
@Component
@ServerEndpoint(value = "/ws/text/{token}")
public class WebSocketServerHandler {


    private static final CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>();

    private static final Map<Session, Long> clients = new ConcurrentHashMap<>();
    //    private Session session;
    private static final String HEARTBEAT_MESSAGE = "heartbeat";
    private static final long HEARTBEAT_INTERVAL = 5000; // 心跳间隔时间,单位为毫秒
    private final Map<Session, ScheduledFuture<?>> heartbeatTasks = new ConcurrentHashMap<>();


    // 收到消息
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("[websocket] 收到消息:id={},message={}", session.getId(), message);
        JSONObject jsonObject = JSONObject.parseObject(message);
        if ("ping".equals(jsonObject.getString("type"))) {
            jsonObject.put("type", "pong");
            session.getAsyncRemote().sendText(String.valueOf(jsonObject));
            clients.put(session, System.currentTimeMillis());
            String token = session.getPathParameters().get("token");
            if (ObjectUtils.isEmpty(token)) {
                session.close();
            }
            GetServiceHandler.refreshToken(token);
        }


    }

    // 连接打开
    @OnOpen
    public void onOpen(Session session, EndpointConfig endpointConfig, @PathParam("token") String token) throws IOException {
        log.info("[websocket] 验证用户信息验证:id={}", session.getId());
        log.info("token:{}", token);
        //验证用户信息
        LoginUser loginUser = null;
        try {
            loginUser = GetServiceHandler.getLoginUser(token);
        } catch (Exception e) {
            JSONObject error = new JSONObject();
            error.put("type", "error");
            error.put("msg", "用户信息验证失败!");
            session.getAsyncRemote().sendText(error.toJSONString());
            session.close();
            return;
        }
        if (ObjectUtils.isEmpty(loginUser)) {
            log.info("[websocket] 用户信息验证失败:id={}", session.getId());
            JSONObject error = new JSONObject();
            error.put("type", "error");
            error.put("msg", "用户信息验证失败!");
            session.getAsyncRemote().sendText(error.toJSONString());
            session.close();
            return;
        }
        // 保存 session 到对象
        sessions.add(session);
        session.getPathParameters().put("token", token);
        log.info("[websocket] 新的连接:id={}", session.getId());
        log.info("token:{}", token);
        clients.put(session, System.currentTimeMillis());
        // 启动心跳任务
//        startHeartbeatCheck(session);
    }


    // 连接关闭
    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        log.info("[websocket] 连接断开:id={},reason={}", session.getId(), closeReason);
        sessions.remove(session);
        clients.remove(session);
//        stopHeartbeatCheck(session);
    }

    // 连接异常
    @OnError
    public void onError(Throwable throwable, Session session) throws IOException {
        log.info("[websocket] 连接异常:id={},throwable={}", session.getId(), throwable.getMessage());
        // 关闭连接。状态码为 UNEXPECTED_CONDITION(意料之外的异常)
        session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
        clients.remove(session);
    }


    /**
     * 群发消息
     *
     * @param message
     * @throws IOException
     */
    public static void sendMessage(String message) throws IOException {
        for (Session session : sessions) {
            session.getAsyncRemote().sendText(message);
        }
    }

    /**
     * 启动心跳检查
     *
     * @param session
     */
    private void startHeartbeatCheck(Session session) {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
            try {
                session.getBasicRemote().sendText(HEARTBEAT_MESSAGE);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);

        heartbeatTasks.put(session, future);
    }

    /**
     * 停止心跳检查
     *
     * @param session
     */
    private void stopHeartbeatCheck(Session session) {
        ScheduledFuture<?> future = heartbeatTasks.get(session);
        if (future != null) {
            future.cancel(true);
        }
        heartbeatTasks.remove(session);
    }

    @Scheduled(fixedRate = HEARTBEAT_INTERVAL)
    public void checkClientActivity() {
        long currentTime = System.currentTimeMillis();
        clients.forEach((session, lastActiveTime) -> {
            if (currentTime - lastActiveTime > HEARTBEAT_INTERVAL * 2) {
                // 发送心跳超时消息给客户端
                try {
//                    session.getBasicRemote().sendText("Heartbeat timeout");
                    JSONObject error = new JSONObject();
                    error.put("type", "failed");
                    error.put("msg", "心跳超时!");
                    session.getBasicRemote().sendText(error.toJSONString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                //超时断开连接
                try {
                    session.close();
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
                clients.remove(session);
            }
        });
    }


}
public class GetServiceHandler {

    private static TokenService tokenService;


    public static LoginUser getLoginUser(String token) {
        TokenService tokenService = getTokenService();
        return tokenService.getLoginUser(token);
    }

    private static TokenService getTokenService() {
        if (ObjectUtils.isEmpty(tokenService)) {
            synchronized (GetServiceHandler.class) {
                if (ObjectUtils.isEmpty(tokenService)) {
                    tokenService = SpringUtil.getBean(TokenService.class);
                }
            }
        }
        return tokenService;
    }


    public static void refreshToken(String token) {
        TokenService tokenService = getTokenService();
        LoginUser loginUser = getLoginUser(token);
        tokenService.verifyToken(loginUser);
    }

}