1.添加依赖
<!-- Spring Boot WebSocket依赖 -->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-websocket</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.3.22</version>
<scope>compile</scope>
</dependency>
2.配置和启用websocket 支持
@Configuration
public class WebsocketBeanConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3.配置监听事件及集成用户验证
@Slf4j
@Component
@ServerEndpoint(value = "/ws/text/{token}")
public class WebSocketServerHandler {
private static final CopyOnWriteArraySet<Session> sessions = new CopyOnWriteArraySet<>();
private static final Map<Session, Long> clients = new ConcurrentHashMap<>();
// private Session session;
private static final String HEARTBEAT_MESSAGE = "heartbeat";
private static final long HEARTBEAT_INTERVAL = 5000; // 心跳间隔时间,单位为毫秒
private final Map<Session, ScheduledFuture<?>> heartbeatTasks = new ConcurrentHashMap<>();
// 收到消息
@OnMessage
public void onMessage(String message, Session session) throws IOException {
log.info("[websocket] 收到消息:id={},message={}", session.getId(), message);
JSONObject jsonObject = JSONObject.parseObject(message);
if ("ping".equals(jsonObject.getString("type"))) {
jsonObject.put("type", "pong");
session.getAsyncRemote().sendText(String.valueOf(jsonObject));
clients.put(session, System.currentTimeMillis());
String token = session.getPathParameters().get("token");
if (ObjectUtils.isEmpty(token)) {
session.close();
}
GetServiceHandler.refreshToken(token);
}
}
// 连接打开
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig, @PathParam("token") String token) throws IOException {
log.info("[websocket] 验证用户信息验证:id={}", session.getId());
log.info("token:{}", token);
//验证用户信息
LoginUser loginUser = null;
try {
loginUser = GetServiceHandler.getLoginUser(token);
} catch (Exception e) {
JSONObject error = new JSONObject();
error.put("type", "error");
error.put("msg", "用户信息验证失败!");
session.getAsyncRemote().sendText(error.toJSONString());
session.close();
return;
}
if (ObjectUtils.isEmpty(loginUser)) {
log.info("[websocket] 用户信息验证失败:id={}", session.getId());
JSONObject error = new JSONObject();
error.put("type", "error");
error.put("msg", "用户信息验证失败!");
session.getAsyncRemote().sendText(error.toJSONString());
session.close();
return;
}
// 保存 session 到对象
sessions.add(session);
session.getPathParameters().put("token", token);
log.info("[websocket] 新的连接:id={}", session.getId());
log.info("token:{}", token);
clients.put(session, System.currentTimeMillis());
// 启动心跳任务
// startHeartbeatCheck(session);
}
// 连接关闭
@OnClose
public void onClose(Session session, CloseReason closeReason) {
log.info("[websocket] 连接断开:id={},reason={}", session.getId(), closeReason);
sessions.remove(session);
clients.remove(session);
// stopHeartbeatCheck(session);
}
// 连接异常
@OnError
public void onError(Throwable throwable, Session session) throws IOException {
log.info("[websocket] 连接异常:id={},throwable={}", session.getId(), throwable.getMessage());
// 关闭连接。状态码为 UNEXPECTED_CONDITION(意料之外的异常)
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
clients.remove(session);
}
/**
* 群发消息
*
* @param message
* @throws IOException
*/
public static void sendMessage(String message) throws IOException {
for (Session session : sessions) {
session.getAsyncRemote().sendText(message);
}
}
/**
* 启动心跳检查
*
* @param session
*/
private void startHeartbeatCheck(Session session) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
try {
session.getBasicRemote().sendText(HEARTBEAT_MESSAGE);
} catch (IOException e) {
e.printStackTrace();
}
}, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
heartbeatTasks.put(session, future);
}
/**
* 停止心跳检查
*
* @param session
*/
private void stopHeartbeatCheck(Session session) {
ScheduledFuture<?> future = heartbeatTasks.get(session);
if (future != null) {
future.cancel(true);
}
heartbeatTasks.remove(session);
}
@Scheduled(fixedRate = HEARTBEAT_INTERVAL)
public void checkClientActivity() {
long currentTime = System.currentTimeMillis();
clients.forEach((session, lastActiveTime) -> {
if (currentTime - lastActiveTime > HEARTBEAT_INTERVAL * 2) {
// 发送心跳超时消息给客户端
try {
// session.getBasicRemote().sendText("Heartbeat timeout");
JSONObject error = new JSONObject();
error.put("type", "failed");
error.put("msg", "心跳超时!");
session.getBasicRemote().sendText(error.toJSONString());
} catch (IOException e) {
e.printStackTrace();
}
//超时断开连接
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
clients.remove(session);
}
});
}
}
public class GetServiceHandler {
private static TokenService tokenService;
public static LoginUser getLoginUser(String token) {
TokenService tokenService = getTokenService();
return tokenService.getLoginUser(token);
}
private static TokenService getTokenService() {
if (ObjectUtils.isEmpty(tokenService)) {
synchronized (GetServiceHandler.class) {
if (ObjectUtils.isEmpty(tokenService)) {
tokenService = SpringUtil.getBean(TokenService.class);
}
}
}
return tokenService;
}
public static void refreshToken(String token) {
TokenService tokenService = getTokenService();
LoginUser loginUser = getLoginUser(token);
tokenService.verifyToken(loginUser);
}
}