2025-03-29 14:35:49 +08:00

9.4 KiB
Raw Permalink Blame History

websocket

支持双向通信

使用场景:

  • 大屏展示
  • 日志监控
  • 即时聊天
  • 通知系统

服务端向客户端单向通讯: sse

语法

创建

const ws = new WebSocket(url[, protocols])
const uid = 123
const url = 'localhost:3000'
const ws = new WebSocket(`ws://${url}/ws/${uid}`)

属性

  • readyState: 链接状态
    • 0: 正在尝试建立连接
    • 1: 已链接, 可以通讯
    • 2: 正在关闭
    • 3: 已关闭或无法打开
  • url: 绝对路径

事件

通过 addEventListener() 监听

  • open
  • message
  • error
  • close
ws.addEventListener('open', handleOpen)
ws.addEventListener('message', handleMessage)
ws.addEventListener('error', handleError)
ws.addEventListener('close', handleClose)

方法

  • send
  • close
ws.send('hello')

ws.close()

示例

前端

<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Document</title>
  </head>
  <body>
    <div class="status">
      <span>连接状态:</span>
      <span id="status">未连接</span>
    </div>
    <div class="close">
      <button id="close">关闭连接</button>
    </div>
    <div class="reconnect">
      <button id="reconnect">重新连接</button>
    </div>
    <div>
      <input type="text" id="message" />
      <button id="send">发送</button>
    </div>
    <div class="messages">
      <ul id="messages"></ul>
    </div>
    <script>
      const uid = 123
      const url = 'localhost:3003'
      let ws = null // 添加全局 WebSocket 变量
      let isCloseByHand = false // 是否手动关闭

      // 定义事件处理函数
      const handleOpen = e => {
        console.log(e)
        console.log('连接成功')
        document.getElementById('status').innerText = '已连接'
        console.log('readyState:', ws.readyState)
      }

      const handleMessage = e => {
        console.log(e)
        console.log('收到消息')
        const messages = document.getElementById('messages')
        messages.innerHTML += `<li>${e.data}</li>`
      }

      const handleError = e => {
        console.log(e)
        console.log('连接失败')
        document.getElementById('status').innerText = '连接失败'
      }

      const handleClose = e => {
        console.log(e)
        console.log('连接关闭')
        document.getElementById('status').innerText = '连接关闭'
        // 自动重连
        console.log('isCloseByHand:',isCloseByHand)
        if (!isCloseByHand) {
          let retryFn = handleRetry()
          retryFn()
          document.getElementById('status').innerText = '正在重连'
        }
      }

      // 自动重连
      const handleRetry = () => {
        console.log('进入重连')
        let timeId = null
        let count = 0
        let maxRestry = 10
        let fn = () => {
          timeId = setInterval(() => {
            if (ws.readyState == 1 || count >= maxRestry) {
              console.log('检测到连接成功或重试次数大于10,断开定时器')
              clearInterval(timeId)
              timeId = null
              return
            }
            console.log('正在重连...')
            wsInit()
            count++
          }, 10*1000)
        }
        return fn
      }

      const wsInit = () => {
        // 如果已存在连接,先关闭它
        if (ws) {
          ws.close()
          // 移除所有旧的事件监听器
          ws.removeEventListener('open', handleOpen)
          ws.removeEventListener('message', handleMessage)
          ws.removeEventListener('error', handleError)
          ws.removeEventListener('close', handleClose)
        }
        // 创建新的 WebSocket 连接
        ws = new WebSocket(`ws://${url}/ws/${uid}`)

        isCloseByHand = false

        if (ws.readyState !== 0) return

        // 添加事件监听器
        ws.addEventListener('open', handleOpen)
        ws.addEventListener('message', handleMessage)
        ws.addEventListener('error', handleError)
        ws.addEventListener('close', handleClose)
      }

      // 将 DOM 事件监听器移到外面,只绑定一次
      document.getElementById('send').addEventListener('click', () => {
        if (ws && ws.readyState === 1) {
          const message = document.getElementById('message').value
          ws.send(message)
        } else {
          console.log('WebSocket 未连接')
        }
      })

      document.getElementById('close').addEventListener('click', () => {
        if (ws) {
          ws.close()
          isCloseByHand = true
        }
      })

      document.getElementById('reconnect').addEventListener('click', () => {
        wsInit()
      })

      // 初始化连接
      wsInit()
    </script>
  </body>
</html>

后端

'use strict'

const fastify = require('fastify')()

// 注册 WebSocket 插件
fastify.register(require('@fastify/websocket'), {
  options: { maxPayload: 1048576 },
})

// 添加连接映射表
const connections = new Map()

// 注册 WebSocket 路由
fastify.register(async function (fastify) {
  fastify.get('/ws/:uid', { websocket: true }, (socket, req) => {
    const userId = req.params.uid
    console.log(`用户 ${userId} 已连接`)

    // 存储连接
    connections.set(userId, socket)

    socket.send(`欢迎用户 ${userId}`)

    socket.on('message', message => {
      console.log(`收到来自用户 ${userId} 的消息:`, message.toString())
      socket.send(`服务器已收到消息: ${message}`)
    })

    socket.on('close', () => {
      console.log(`用户 ${userId} 断开连接`)
      connections.delete(userId)
      socket.close()
    })
  })
  fastify.get('/send', (req, reply) => {
    const { msg, uid } = req.query
    if (!msg || !uid) {
      return reply.code(400).send({
        success: false,
        message: '缺少必要参数,请确保提供 msg 和 uid',
        example: '/send?msg=你的消息&uid=用户ID',
      })
    }
    console.log(`尝试发送消息给用户 ${uid}: ${msg}`)

    const userSocket = connections.get(uid)
    if (userSocket && userSocket.readyState === 1) {
      userSocket.send(msg)
      reply.send({ success: true, message: `消息已发送给用户 ${uid}: ${msg}` })
    } else {
      reply.code(404).send({
        success: false,
        message: `用户 ${uid} 不在线或连接已断开`,
      })
    }
  })
})

// 直接在外部注册 `send` 路由

fastify.listen({ port: 3003, host: '0.0.0.0' }, err => {
  if (err) {
    fastify.log.error(err)
    process.exit(1)
  }
  console.log('WebSocket 服务器运行在端口 3003')
})

fastify.ready(err => {
  // 打印所有路由
  if (err) throw err
  console.log(fastify.printRoutes())
})

SSE Server-Sent Events

  1. 客户端请求客户端通过发送一个HTTP请求到服务器请求建立SSE连接。这个请求通常是一个GET请求,并且包含一个特殊的Accept头,表明客户端希望接收text/event-stream类型的数据。
  2. 服务器响应:服务器接收到请求后,会保持连接打开,并开始发送事件流。服务器发送的数据格式是 text/event-stream,每个事件由一行或多行文本组成,以 \n\n(两个换行符)结束。
  3. 事件格式:每个事件可以包含以下字段:
    • data::事件的数据内容,可以是一行或多行。
    • id::事件的唯一标识符,用于断线重连时指定从哪个事件开始接收。
    • event::事件的类型,客户端可以根据这个字段来处理不同类型的事件。
    • retry::指定客户端在连接断开后重新连接的时间间隔(毫秒)。
  4. 客户端处理:客户端通过EventSource对象来接收和处理服务器发送的事件。EventSource会自动处理连接的重连和事件的解析。

服务端

const http = require('http');

http.createServer((req, res) => {
    if (req.url === '/events') {
        res.writeHead(200, {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Access-Control-Allow-Origin': '*'
        });

        let eventId = 0;

        setInterval(() => {
            res.write(`id: ${eventId}\n`);
            res.write(`data: ${JSON.stringify({ message: 'Hello, world!', time: new Date() })}\n\n`);
            eventId++;
        }, 1000);
    } else {
        res.writeHead(404);
        res.end();
    }
}).listen(3000, () => {
    console.log('Server running at http://localhost:3000');
});

客户端

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Example</title>
</head>
<body>
    <h1>Server-Sent Events Example</h1>
    <div id="messages"></div>

    <script>
        const eventSource = new EventSource('http://localhost:3000/events');

        eventSource.onmessage = function(event) {
            const data = JSON.parse(event.data);
            const messageElement = document.createElement('div');
            messageElement.textContent = `Message: ${data.message}, Time: ${data.time}`;
            document.getElementById('messages').appendChild(messageElement);
        };

        eventSource.onerror = function(event) {
            console.error('EventSource failed:', event);
        };
    </script>
</body>
</html>