封装 WebSocket
2024/9/20大约 5 分钟
WebSocket 封装类,提供了连接管理、消息收发、断线重连和心跳检测等功能。
WebSocket 封装类
实现代码
/**
* WebSocket 封装类
* 提供连接管理、消息收发、断线重连和心跳检测功能
*/
class WebSocketClient {
/**
* 创建 WebSocket 客户端实例
* @param {string} url - WebSocket 服务器地址
* @param {Object} options - 配置选项
* @param {number} options.reconnectAttempts - 最大重连次数(默认5次)
* @param {number} options.reconnectInterval - 重连间隔时间(默认1000毫秒)
* @param {number} options.heartbeatInterval - 心跳检测间隔(默认30000毫秒)
* @param {string} options.heartbeatMessage - 心跳消息(默认'ping')
* @param {Array} options.protocols - WebSocket 子协议
*/
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnectAttempts: 5,
reconnectInterval: 1000,
heartbeatInterval: 30000,
heartbeatMessage: "ping",
protocols: [],
...options,
};
this.ws = null;
this.reconnectTimer = null;
this.heartbeatTimer = null;
this.reconnectCount = 0;
this.isManualClose = false;
this.listeners = new Map();
// 初始化连接
this.connect();
}
/**
* 建立 WebSocket 连接
*/
connect() {
try {
this.ws = new WebSocket(this.url, this.options.protocols);
this.ws.onopen = (event) => {
this.handleOpen(event);
};
this.ws.onmessage = (event) => {
this.handleMessage(event);
};
this.ws.onerror = (error) => {
this.handleError(error);
};
this.ws.onclose = (event) => {
this.handleClose(event);
};
} catch (error) {
console.error("WebSocket 连接失败:", error);
this.emit("error", error);
this.attemptReconnect();
}
}
/**
* 发送消息
* @param {string|Object} data - 要发送的数据
* @returns {boolean} 是否发送成功
*/
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
try {
const message = typeof data === "object" ? JSON.stringify(data) : data;
this.ws.send(message);
return true;
} catch (error) {
console.error("WebSocket 发送消息失败:", error);
this.emit("error", error);
return false;
}
} else {
console.warn("WebSocket 连接未建立,无法发送消息");
return false;
}
}
/**
* 手动关闭 WebSocket 连接
*/
close() {
this.isManualClose = true;
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.emit("close", { manual: true });
}
/**
* 注册事件监听器
* @param {string} event - 事件名称
* @param {Function} callback - 回调函数
*/
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
/**
* 移除事件监听器
* @param {string} event - 事件名称
* @param {Function} callback - 要移除的回调函数,不传则移除所有该事件的监听器
*/
off(event, callback) {
if (!this.listeners.has(event)) return;
if (callback) {
const callbacks = this.listeners.get(event);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
} else {
this.listeners.delete(event);
}
}
/**
* 触发事件
* @param {string} event - 事件名称
* @param {*} data - 事件数据
*/
emit(event, data) {
if (!this.listeners.has(event)) return;
const callbacks = this.listeners.get(event);
callbacks.forEach((callback) => {
try {
callback(data);
} catch (error) {
console.error(`WebSocket 事件 ${event} 处理错误:`, error);
}
});
}
/**
* 处理连接打开事件
* @private
* @param {Event} event - 连接打开事件对象
*/
handleOpen(event) {
console.log("WebSocket 连接已建立");
this.reconnectCount = 0;
this.emit("open", event);
this.startHeartbeat();
}
/**
* 处理消息接收事件
* @private
* @param {MessageEvent} event - 消息事件对象
*/
handleMessage(event) {
try {
// 尝试解析 JSON 消息
let data = event.data;
if (typeof data === "string") {
try {
data = JSON.parse(data);
} catch (e) {
// 不是 JSON 格式,保持原样
}
}
// 处理心跳响应
if (
data === this.options.heartbeatMessage ||
(typeof data === "object" && data.type === "pong")
) {
return;
}
this.emit("message", data);
} catch (error) {
console.error("WebSocket 消息处理失败:", error);
this.emit("error", error);
}
}
/**
* 处理错误事件
* @private
* @param {Event} error - 错误事件对象
*/
handleError(error) {
console.error("WebSocket 错误:", error);
this.emit("error", error);
}
/**
* 处理连接关闭事件
* @private
* @param {CloseEvent} event - 连接关闭事件对象
*/
handleClose(event) {
console.log("WebSocket 连接已关闭", event.code, event.reason);
this.stopHeartbeat();
this.ws = null;
if (!this.isManualClose) {
this.attemptReconnect();
}
this.emit("close", event);
}
/**
* 尝试重新连接
* @private
*/
attemptReconnect() {
if (this.reconnectCount >= this.options.reconnectAttempts) {
console.error("WebSocket 重连失败,已达到最大重连次数");
this.emit("reconnect_failed");
return;
}
this.reconnectCount++;
console.log(`WebSocket 尝试第 ${this.reconnectCount} 次重连...`);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.options.reconnectInterval * Math.pow(2, this.reconnectCount - 1)); // 指数退避策略
}
/**
* 开始心跳检测
* @private
*/
startHeartbeat() {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send(this.options.heartbeatMessage);
}
}, this.options.heartbeatInterval);
}
/**
* 停止心跳检测
* @private
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 获取当前连接状态
* @returns {number|null} WebSocket 连接状态码,或 null 表示未初始化
*/
get readyState() {
return this.ws ? this.ws.readyState : null;
}
/**
* 获取连接状态的可读描述
* @returns {string} 连接状态描述
*/
get stateDescription() {
if (!this.ws) return "未初始化";
switch (this.ws.readyState) {
case WebSocket.CONNECTING:
return "连接中";
case WebSocket.OPEN:
return "已连接";
case WebSocket.CLOSING:
return "关闭中";
case WebSocket.CLOSED:
return "已关闭";
default:
return "未知状态";
}
}
}
代码解析
核心功能模块:
- 连接管理:建立、关闭连接,处理各种连接状态事件
- 消息处理:发送和接收消息,支持自动解析 JSON 格式
- 断线重连:实现指数退避重连策略,可配置最大重连次数和间隔
- 心跳检测:定期发送心跳消息,保持连接活跃
- 事件系统:提供自定义事件监听机制
主要方法解析:
connect()
: 建立 WebSocket 连接并设置各种事件处理器send(data)
: 发送消息,支持字符串和对象格式close()
: 手动关闭连接,停止所有定时器on/off/emit()
: 事件监听和触发系统attemptReconnect()
: 实现指数退避的重连策略startHeartbeat()/stopHeartbeat()
: 管理心跳检测机制
关键设计点:
- 使用类封装,便于实例化和复用
- 提供丰富的配置选项,适应不同场景需求
- 实现错误处理和日志记录,便于调试
- 通过状态管理避免重复连接和资源浪费
- 指数退避重连策略,减少服务器压力
使用方法
// 创建 WebSocket 客户端实例
const wsClient = new WebSocketClient("ws://localhost:8080/ws", {
reconnectAttempts: 5, // 最大重连次数
reconnectInterval: 1000, // 初始重连间隔(毫秒)
heartbeatInterval: 30000, // 心跳间隔(毫秒)
heartbeatMessage: "ping", // 心跳消息内容
});
// 监听连接建立事件
wsClient.on("open", () => {
console.log("WebSocket 连接成功");
// 连接成功后发送消息
wsClient.send({ type: "login", data: { username: "user123" } });
});
// 监听消息接收事件
wsClient.on("message", (data) => {
console.log("收到消息:", data);
// 根据消息类型处理不同的业务逻辑
if (data.type === "notification") {
handleNotification(data);
} else if (data.type === "data_update") {
updateData(data);
}
});
// 监听连接关闭事件
wsClient.on("close", (event) => {
console.log("连接关闭:", event);
});
// 监听错误事件
wsClient.on("error", (error) => {
console.error("发生错误:", error);
});
// 监听重连失败事件
wsClient.on("reconnect_failed", () => {
console.error("WebSocket 重连失败,请检查网络连接");
// 可以在这里提示用户手动重连
});
// 发送消息的示例函数
function sendChatMessage(content) {
wsClient.send({
type: "chat",
data: {
content: content,
timestamp: new Date().toISOString(),
},
});
}
// 应用退出时关闭连接
window.addEventListener("beforeunload", () => {
wsClient.close();
});
// 获取连接状态示例
function getConnectionStatus() {
console.log(`当前连接状态: ${wsClient.stateDescription}`);
return wsClient.readyState;
}
更新日志
2025/9/28 09:03
查看所有更新日志
38e56
-于