欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.csdn.net

在这里插入图片描述

前言

在现代 App 开发中,实时通信(Real-time Communication)已成为标配。无论是社交聊天的由“推”变“拉”,还是股票行情的毫秒级跳动,亦或是智能家居的状态同步,传统的 HTTP 轮询(Polling)已无法满足低延迟、高并发的需求。

WebSocket 协议应运而生。它基于 TCP,但在握手阶段利用 HTTP 升级协议(Upgrade Header),成功后建立全双工(Full-Duplex)的长连接。在这条通道上,客户端和服务端可以随时互相推送数据,且头部开销极小。

在 Flutter 生态中,虽然 dart:io 提供了原生的 WebSocket 类,dart:html 提供了浏览器的 WebSocket API,但两者的接口定义完全不同(一个是 Stream,一个是 EventTarget)。为了实现 “Write Once, Run Everywhere”,Google 官方推出了 web_socket_channel

它不仅抹平了移动端(Android/iOS/OpenHarmony)与 Web 端的差异,还提供了一套基于 StreamChannel 的统一接口。对于鸿蒙开发者而言,掌握这个库,意味着你无需关心底层是基于系统 Socket 还是基于浏览器 API,直接面向“流”编程即可。

一、核心原理与架构解析

1.1 协议握手流程

理解 WebSocket,首先要理解它是如何建立的。

  1. Client Hello: 客户端发送一个标准的 HTTP GET 请求,带上 Connection: UpgradeUpgrade: websocket 头。
  2. Server Agree: 如果服务器支持,返回 HTTP 101 Switching Protocols 响应。
  3. Connection Established: 此后,TCP 连接保持打开,双方通过二进制帧(Frame)传输数据。

web_socket_channel 在底层帮我们处理了这些复杂的握手逻辑:

  • IOWebSocketChannel:
    • 在 Android/iOS/OpenHarmony 上,它调用 dart:ioWebSocket.connect()
    • 底层依赖 Dart VM 的 Native Socket 实现,通过 C++ 层面的 OS_Socket 与操作系统内核交互。
  • HtmlWebSocketChannel:
    • 在 Flutter Web 上,它调用浏览器的 new WebSocket() JS API。
    • 底层由浏览器内核(Chromium/WebKit)管理连接。

1.2 StreamChannel 设计哲学

Dart 的异步模型建立在 FutureStream 之上。web_socket_channel 并没有重新发明轮子,而是遵循了 StreamChannel 规范:

  • Sink (输入): 你通过 channel.sink.add(msg) 发送数据。这就像往水管里倒水,你只管倒,数据会自动流向服务器。
  • Stream (输出): 你通过 channel.stream.listen() 接收数据。服务器每发来一条消息,就会触发一次回调。

添加数据

序列化

网络传输

响应数据

监听回调

Flutter 应用

输入流 (Sink)

WebSocket 帧

Server

输出流 (Stream)

1.3 状态码与关闭帧

不同于 HTTP 的 200/404,WebSocket 有自己的一套关闭码:

  • 1000 (Normal Closure): 正常关闭(如用户点击退出)。
  • 1001 (Going Away): 终端离开(如页面跳转)。
  • 1006 (Abnormal): 异常断开(如拔网线),这是最常见但也最难捕获的错误。

OpenHarmony 注意事项:在鸿蒙系统上,当 App 进入后台(Background)被挂起时,Socket 连接可能会因为系统为了省电而断开。此时不仅不会收到 1000,甚至可能连 1006 都没有,直接就是 StreamonDone 或抛出 SocketException

二、核心 API 详解与进阶配置

2.1 基础连接与配置

最简单的连接只需要一行代码:

import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;

final channel = WebSocketChannel.connect(
  Uri.parse('wss://echo.websocket.events'),
);

但在企业级项目中,我们需要更多控制:

import 'package:web_socket_channel/io.dart';
import 'dart:io';

// 仅在 Mobile/Desktop/OpenHarmony 使用 IOWebSocketChannel
final channel = IOWebSocketChannel.connect(
  Uri.parse('wss://api.myserver.com/ws'),
  headers: {
    'Authorization': 'Bearer <token>', // 鉴权 Token
    'X-Client-Version': '1.0.0', // 客户端版本
  },
  pingInterval: const Duration(seconds: 5), // 自动发送心跳 Ping
  connectTimeout: const Duration(seconds: 10), // 连接超时
);

关键参数解析

  • pingInterval: 这是一个极其实用的参数。设置后,Dart 客户端会自动每隔 5 秒发送一个 Ping 帧。如果服务器在一定时间内没有回复 Pong,客户端会判定连接断开并报错。这比自己写 Timer 心跳要高效得多。

在这里插入图片描述

2.2 发送数据 (Text vs Binary)

WebSocket 支持文本(UTF-8 String)和二进制(List)传输。

// 1. 发送 JSON 字符串
channel.sink.add(jsonEncode({'type': 'login', 'uid': 123}));

// 2. 发送二进制 (如 Protobuf 或 图片数据)
final List<int> bytes = [0x01, 0x02, 0x03];
channel.sink.add(bytes);

在这里插入图片描述

2.3 异常处理与资源释放

这是新手最容易踩坑的地方。Stream 的监听必须处理 onErroronDone

channel.stream.listen(
  (data) {
    // 处理正常消息
    print('Received: $data');
  },
  onError: (error) {
    // 处理网络异常 (如 SocketException: Connection refused)
    print('WS Error: $error');
  },
  onDone: () {
    // 处理连接关闭 (无论正常还是异常,最终都会走到这里)
    print('WS Closed');
    // 这里可以触发重连逻辑
  },
  cancelOnError: true, // 遇到错误是否取消订阅,建议 true
);

// 主动关闭
channel.sink.close(status.goingAway);

三、生产环境实战:打造永不掉线的 WebSocketClient

直接使用 WebSocketChannel 还是太“裸”了。在生产环境中,我们需要处理 断线重连心跳保活消息队列 等复杂问题。

3.1 架构设计

我们将封装一个 WebSocketService

  1. Reconnection Strategy: 指数退避算法(Exponential Backoff)。第一次失败等 1s,第二次等 2s,第三次等 4s… 直到上限。
  2. Heartbeat: 应用层心跳(Ping/Pong),防止“半开连接”(连接死掉但双方都不知道)。
  3. State Management: 通过 Stream 对外暴露 Connected, Connecting, Disconnected 状态。

3.2 核心代码实现

import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;

enum ConnectionStatus { connecting, connected, disconnected }

class WebSocketService {
  WebSocketChannel? _channel;
  final StreamController<ConnectionStatus> _statusController = StreamController.broadcast();
  final StreamController<dynamic> _messageController = StreamController.broadcast();
  
  Timer? _reconnectTimer;
  bool _isDisposed = false;
  int _retryCount = 0;
  
  Stream<ConnectionStatus> get status => _statusController.stream;
  Stream<dynamic> get messages => _messageController.stream;

  final String url;
  
  WebSocketService(this.url);

  void connect() {
    if (_isDisposed) return;
    
    _statusController.add(ConnectionStatus.connecting);
    print('Connecting to $url...');

    try {
      _channel = WebSocketChannel.connect(Uri.parse(url));
      
      _channel!.stream.listen(
        (data) {
          _retryCount = 0; // 重置重连计数
          if (_statusController.last != ConnectionStatus.connected) {
             _statusController.add(ConnectionStatus.connected);
          }
          _messageController.add(data);
        },
        onError: (e) {
          print('WS Error: $e');
          _reconnect();
        },
        onDone: () {
          print('WS Done');
          _reconnect();
        },
      );
    } catch (e) {
      print('Connect Exception: $e');
      _reconnect();
    }
  }

  void _reconnect() {
    if (_isDisposed) return;
    _statusController.add(ConnectionStatus.disconnected);

    // 指数退避:每次等待时间 2^n 秒,最大 30s
    final delay = min(30, pow(2, _retryCount++).toInt()); 
    print('Scheduling reconnect in ${delay}s...');

    _reconnectTimer?.cancel();
    _reconnectTimer = Timer(Duration(seconds: delay), () {
      print('Retrying connection...');
      connect();
    });
  }

  void send(String msg) {
    if (_channel != null && _channel!.closeCode == null) {
      _channel!.sink.add(msg);
    } else {
      print('Cannot send: not connected');
      // 可选:加入待发送队列
    }
  }

  void dispose() {
    _isDisposed = true;
    _reconnectTimer?.cancel();
    _channel?.sink.close(status.goingAway);
    _statusController.close();
    _messageController.close();
  }
}

在这里插入图片描述

四、OpenHarmony 平台适配指南

在鸿蒙系统上使用 WebSocket,有一些特有的约束和最佳实践。

4.1 网络权限配置

这是最基础的。所有的网络请求都需要权限。
entry/src/main/module.json5 中确保声明:

"requestPermissions": [
  {
    "name": "ohos.permission.INTERNET"
  }
]

4.2 后台保活与 Doze 模式

OpenHarmony 系统为了降低功耗,会对后台应用的网络活动进行限制。

  • 现象:当用户按下 Home 键,几分钟后,Socket 连接可能就会断开,且不会立即收到通知。
  • 解决方案
    1. 申请后台任务:如果你做的是音乐播放器或下载器,可以申请 ohos.permission.KEEP_BACKGROUND_RUNNING
    2. 前台服务:对于普通聊天 App,尽量使用 Push Kit(鸿蒙推送服务)来唤醒 App,而不是依赖后台 WebSocket 长连接。推送到达后,再拉起 App 建立 WS 连接拉取消息。
    3. 生命周期感知:在 Flutter 的 WidgetsBindingObserver 中监听 AppLifecycleState
      • paused: 暂停心跳包发送,记录时间戳。
      • resumed: 检查上次暂停时间,如果超过 1 分钟,主动断开重连。

4.3 流量与电量监控

频繁的心跳包(如 1s 一次)会阻止 CPU 进入深度休眠(Deep Sleep),导致手机发热耗电。

  • 建议心跳间隔:移动网络下 30s - 60s,WiFi 下 60s - 120s。
  • 智能心跳:如果用户正在交互(如打字),可以加密心跳;如果 App 闲置,拉大心跳间隔。

五、完整示例:鸿蒙实时股票看板

本示例展示如何在 OpenHarmony 上构建一个高频刷新的实时数据看板。

import 'package:flutter/material.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'dart:convert';

void main() {
  runApp(const MaterialApp(home: StockPage()));
}

class StockPage extends StatefulWidget {
  const StockPage({super.key});

  
  State<StockPage> createState() => _StockPageState();
}

class _StockPageState extends State<StockPage> {
  late WebSocketChannel _channel;
  // 模拟数据结构:{'symbol': 'AAPL', 'price': 150.2, 'change': 0.5}
  final Map<String, dynamic> _stocks = {};

  
  void initState() {
    super.initState();
    // 使用公共测试节点 (注:实际开发请换成你的服务器)
    _channel = WebSocketChannel.connect(
      Uri.parse('wss://echo.websocket.events'),
    );
    
    // 模拟服务端推送:每秒发送一条随机变动的股价
    _mockServerPush();

    _channel.stream.listen((data) {
      try {
        final update = jsonDecode(data);
        setState(() {
          _stocks[update['symbol']] = update;
        });
      } catch (e) {
        // ignore non-json
      }
    });
  }

  void _mockServerPush() async {
    while (mounted) {
      await Future.delayed(const Duration(milliseconds: 500));
      // 模拟服务器下发数据
      final mockData = jsonEncode({
        'symbol': 'TSLA',
        'price': 200 + (DateTime.now().millisecond % 100) / 10.0,
        'change': (DateTime.now().second % 2 == 0) ? 1.2 : -0.5
      });
      // 发给自己(Echo 模式)
      _channel.sink.add(mockData);
      
      final mockData2 = jsonEncode({
        'symbol': 'AAPL',
        'price': 150 + (DateTime.now().millisecond % 50) / 10.0,
        'change': (DateTime.now().second % 2 == 0) ? -0.8 : 0.3
      });
      _channel.sink.add(mockData2);
    }
  }

  
  void dispose() {
    _channel.sink.close();
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: const Text('Ohos Stock Market')),
      body: ListView(
        children: _stocks.entries.map((entry) {
          final stock = entry.value;
          final isUp = (stock['change'] as num) >= 0;
          return ListTile(
            leading: CircleAvatar(
              backgroundColor: isUp ? Colors.red : Colors.green,
              child: Text(stock['symbol'][0]),
            ),
            title: Text(stock['symbol'], style: const TextStyle(fontWeight: FontWeight.bold)),
            subtitle: Text('Nasdaq'),
            trailing: Column(
              mainAxisAlignment: MainAxisAlignment.center,
              crossAxisAlignment: CrossAxisAlignment.end,
              children: [
                Text(
                  '\$${(stock['price'] as num).toStringAsFixed(2)}',
                  style: TextStyle(
                    color: isUp ? Colors.red : Colors.green,
                    fontSize: 18,
                    fontWeight: FontWeight.bold
                  ),
                ),
                Text(
                  '${isUp ? '+' : ''}${(stock['change'] as num).toStringAsFixed(2)}%',
                  style: TextStyle(
                    color: isUp ? Colors.red : Colors.green,
                    fontSize: 12,
                  ),
                ),
              ],
            ),
          );
        }).toList(),
      ),
    );
  }
}

在这里插入图片描述

六、总结

web_socket_channel 是 Flutter 网络编程的瑞士军刀。虽然它只是一个基础库,但结合 rxdartStream 机制,我们可以构建出极其健壮的实时应用。

对于 OpenHarmony 开发者,掌握 WebSocket 的意义不仅在于聊天,更在于理解全双工通信背后的异步思维。

进阶学习路线

  1. Protocol Buffers: 学会使用 Protobuf 替代 JSON 传输,可以将流量节省 70%,极大提升弱网体验。
  2. GRPC: 谷歌的高性能 RPC 框架,底层也是 HTTP/2 长连接。如果你的业务非常复杂,gRPC 可能是比 WebSocket 更规范的选择。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐