优化配置,修复 demo bug

1,network 框架完善
2,websocket 机制完善
3,设计文档整理到架构文档
4,脚本,配置完善
This commit is contained in:
Cody
2026-03-07 14:58:10 +08:00
parent f8a118af73
commit 0ee2c8c63c
82 changed files with 2704 additions and 1045 deletions

View File

@@ -1,12 +1,15 @@
import 'package:dio/dio.dart';
import 'package:networks_sdk/src/data/datasources/http/interceptor/auth_interceptor.dart';
import 'package:networks_sdk/src/data/datasources/http/interceptor/encryption_interceptor.dart';
import 'package:networks_sdk/src/data/datasources/http/interceptor/logging_interceptor.dart';
import 'package:networks_sdk/src/data/datasources/http/interceptor/retry_interceptor.dart';
import 'package:networks_sdk/src/domain/entities/api_error.dart';
import 'package:networks_sdk/src/presentation/wiring/api_config.dart';
/// REST API 客户端
/// 基于 Dio提供 `executeRequest<T>` 唯一入口
/// 基于 Dio提供请求执行入口
///
/// 拦截器链顺序Auth → Encryption → 自定义 → Retry → Logging
///
/// 使用方式:
/// ```dart
@@ -28,9 +31,10 @@ class ApiClient {
receiveTimeout: const Duration(seconds: 60),
);
// 挂载拦截器顺序Auth → 自定义 → Retry → Logging
// 挂载拦截器顺序Auth → Encryption → 自定义 → Retry → Logging
_dio.interceptors.addAll([
AuthInterceptor(config),
EncryptionInterceptor(config),
if (additionalInterceptors != null) ...additionalInterceptors,
RetryInterceptor(config: config, dio: _dio),
LoggingInterceptor(onLog: config.onLog),
@@ -49,16 +53,16 @@ class ApiClient {
return const ApiError.timeout();
case DioExceptionType.connectionError:
return const ApiError.noNetworkConnection();
case DioExceptionType.cancel:
return const ApiError.cancelled();
default:
if (e.response != null) {
return ApiError.apiError(
code: e.response!.statusCode ?? 0,
message: e.response!.statusMessage ??
e.message ??
'Request failed',
message: e.response!.statusMessage ?? e.message ?? 'Request failed',
);
}
return ApiError.networkError(e.message ?? 'Network error');
}
}
}
}

View File

@@ -0,0 +1,114 @@
import 'package:dio/dio.dart';
import 'package:networks_sdk/src/presentation/wiring/api_config.dart';
/// 加密拦截器(预留给 cipher_guard_sdk
///
/// 在拦截器链中位于 Auth 之后、Retry 之前:
/// `Auth → Encryption → Custom → Retry → Logging`
///
/// 回调为 null 时自动跳过,不影响正常请求流程。
/// 后续 cipher_guard_sdk 接入后App 层在 ApiConfig 中注入
/// `onEncryptRequest` / `onDecryptResponse` 即可启用加密。
///
/// ## 加密能力
///
/// 与简单的 body 加解密不同,本拦截器支持完整的请求改写:
/// - 路径加密(如 `/api/login` → `/api/hex(encrypt(login))`
/// - 请求体加密Map → base64 字符串)
/// - Header 注入X-Token、X-Signature、secret-key 等)
/// - Content-Type 覆盖application/json → text/plain
///
/// 加密回调接收原始 path、headers、body返回 [EncryptedRequest]
/// 拦截器根据非 null 字段覆盖请求。
class EncryptionInterceptor extends Interceptor {
final ApiConfig _config;
EncryptionInterceptor(this._config);
@override
void onRequest(
RequestOptions options,
RequestInterceptorHandler handler,
) async {
final encrypt = _config.onEncryptRequest;
if (encrypt == null) {
handler.next(options);
return;
}
try {
// 收集当前 headers转为 Map<String, String>
final currentHeaders = <String, String>{};
options.headers.forEach((key, value) {
if (value != null) currentHeaders[key] = value.toString();
});
final result = await encrypt(options.path, currentHeaders, options.data);
// 根据非 null 字段覆盖请求
if (result.path != null) {
options.path = result.path!;
}
if (result.body != null) {
options.data = result.body;
}
if (result.headers != null) {
options.headers.addAll(result.headers!);
}
if (result.contentType != null) {
options.contentType = result.contentType;
}
_config.onLog?.call(
'Request encrypted: ${options.path}',
tag: 'Encryption',
);
handler.next(options);
} catch (e) {
_config.onLog?.call('Request encryption failed: $e', tag: 'Encryption');
handler.reject(
DioException(
requestOptions: options,
message: 'Request encryption failed: $e',
),
);
}
}
@override
void onResponse(Response response, ResponseInterceptorHandler handler) async {
final decrypt = _config.onDecryptResponse;
if (decrypt == null) {
handler.next(response);
return;
}
// 跳过 null 响应
if (response.data == null) {
handler.next(response);
return;
}
try {
final decrypted = await decrypt(response.data as Object);
response.data = decrypted;
_config.onLog?.call(
'Response decrypted: ${response.requestOptions.path}',
tag: 'Encryption',
);
handler.next(response);
} catch (e) {
_config.onLog?.call('Response decryption failed: $e', tag: 'Encryption');
handler.reject(
DioException(
requestOptions: response.requestOptions,
response: response,
message: 'Response decryption failed: $e',
),
);
}
}
}

View File

@@ -1,35 +1,41 @@
import 'dart:async';
import 'dart:math';
import 'package:dio/dio.dart';
import 'package:networks_sdk/src/data/datasources/http/token_refresh_manager.dart';
import 'package:networks_sdk/src/presentation/wiring/api_config.dart';
/// 重试拦截器
///
/// 两层重试机制:
///
/// 1. **Token 刷新重试**onResponse
/// 检测 Token 过期响应 → 触发刷新回调 → 用新 Token 重试原请求
/// 检测 Token 过期响应 → 触发 [TokenRefreshManager] → 用新 Token 重试原请求
///
/// 2. **瞬态错误重试**onError
/// 5xx / 超时 / 连接失败 → 指数退避 + jitter → 自动重试
/// 由 [ApiConfig.maxRetries] 控制(默认 0 = 不启用)
///
/// 另外在 onResponse 中处理强制登出码和业务错误码。
///
/// 两层独立运作,可叠加。
class RetryInterceptor extends Interceptor {
final ApiConfig config;
final Dio dio;
/// Token 刷新锁(防止多个请求同时刷新)
bool _isRefreshing = false;
Completer<bool>? _refreshCompleter;
final TokenRefreshManager _tokenManager;
final _random = Random();
RetryInterceptor({required this.config, required this.dio});
RetryInterceptor({required this.config, required this.dio})
: _tokenManager = TokenRefreshManager(
onTokenRefresh: config.onTokenRefresh,
onLog: config.onLog,
timeout: config.tokenRefreshTimeout,
reuseWindow: config.tokenReuseWindow,
onGetTokenExpiry: config.onGetTokenExpiry,
proactiveRefreshThreshold: config.proactiveRefreshThreshold,
);
// ── Token 刷新重试 ────────────────────────────────────────────────────────
// ── 响应处理(Token 过期 / 强制登出 / 业务错误码)──────────────────────
@override
void onResponse(Response response, ResponseInterceptorHandler handler) {
@@ -40,13 +46,12 @@ class RetryInterceptor extends Interceptor {
final data = response.data as Map<String, dynamic>;
final code = _parseCode(data['code']);
final message = data['message'] as String? ?? '';
final requestPath = response.requestOptions.path;
// 检查强制登出
if (config.forceLogoutCodes.contains(code)) {
config.onLog?.call(
'Force logout detected (code: $code)',
tag: 'Network',
);
config.onLog?.call('Force logout detected (code: $code)', tag: 'Network');
config.onForceLogout?.call();
handler.reject(
DioException(
@@ -58,8 +63,9 @@ class RetryInterceptor extends Interceptor {
return;
}
// 检查 Token 过期
if (config.tokenExpiredCodes.contains(code)) {
// 检查 Token 过期(跳过已标记为 token 重试的请求,防止递归)
if (config.tokenExpiredCodes.contains(code) &&
response.requestOptions.extra['_isTokenRetry'] != true) {
config.onLog?.call(
'Token expired (code: $code), refreshing...',
tag: 'Network',
@@ -68,17 +74,27 @@ class RetryInterceptor extends Interceptor {
return;
}
// 业务错误码拦截:非 0 且不在特殊码集合中
if (code != 0 && config.onBusinessError != null) {
final handled = config.onBusinessError!(code, message, requestPath);
if (handled) {
// App 层已处理,正常传递响应
handler.next(response);
return;
}
}
handler.next(response);
}
/// 处理 Token 过期:刷新 + 重试
Future<void> _handleTokenExpired(
Response response,
ResponseInterceptorHandler handler,
) async {
final refreshSuccess = await _refreshToken();
Response response,
ResponseInterceptorHandler handler,
) async {
final newToken = await _tokenManager.refreshIfNeeded();
if (!refreshSuccess) {
if (newToken == null) {
config.onLog?.call('Token refresh failed', tag: 'Network');
config.onForceLogout?.call();
handler.reject(
@@ -91,12 +107,14 @@ class RetryInterceptor extends Interceptor {
return;
}
// 刷新成功,用新 token 重试原请求
// 刷新成功,更新 config 并用新 token 重试原请求
config.updateToken(newToken);
config.onLog?.call('Token refreshed, retrying...', tag: 'Network');
try {
final options = response.requestOptions;
// 更新 header 中的 token
options.headers['token'] = config.token;
options.headers['token'] = newToken;
// 标记为 token 重试请求,防止重试后再次进入 _handleTokenExpired 造成递归
options.extra['_isTokenRetry'] = true;
final retryResponse = await dio.fetch(options);
handler.resolve(retryResponse);
@@ -105,41 +123,6 @@ class RetryInterceptor extends Interceptor {
}
}
/// Token 刷新(串行锁)
/// 多个请求同时过期时,只刷新一次,其余等待
Future<bool> _refreshToken() async {
if (_isRefreshing) {
// 等待正在进行的刷新
return _refreshCompleter?.future ?? Future.value(false);
}
_isRefreshing = true;
_refreshCompleter = Completer<bool>();
try {
if (config.onTokenRefresh == null) {
_refreshCompleter!.complete(false);
return false;
}
final newToken = await config.onTokenRefresh!();
final success = newToken != null;
if (success) {
config.updateToken(newToken);
}
_refreshCompleter!.complete(success);
return success;
} catch (e) {
_refreshCompleter!.complete(false);
return false;
} finally {
_isRefreshing = false;
_refreshCompleter = null;
}
}
// ── 瞬态错误重试(指数退避 + jitter────────────────────────────────────
@override
@@ -162,7 +145,7 @@ class RetryInterceptor extends Interceptor {
final delayMs = _backoffDelay(attempt);
config.onLog?.call(
'Transient error, retry ${attempt + 1}/${config.maxRetries} '
'in ${delayMs}ms: ${options.path}',
'in ${delayMs}ms: ${options.path}',
tag: 'Retry',
);
@@ -184,7 +167,7 @@ class RetryInterceptor extends Interceptor {
case DioExceptionType.connectionError:
return true;
case DioExceptionType.badResponse:
// 5xx 服务端错误可重试
// 5xx 服务端错误可重试
final statusCode = err.response?.statusCode;
return statusCode != null && statusCode >= 500;
default:
@@ -198,7 +181,9 @@ class RetryInterceptor extends Interceptor {
int _backoffDelay(int attempt) {
final baseMs = config.retryBaseDelay.inMilliseconds;
final exponentialMs = min(baseMs * pow(2, attempt).toInt(), 30000);
final jitterMs = _random.nextInt((exponentialMs * 0.25).toInt().clamp(1, 7500));
final jitterMs = _random.nextInt(
(exponentialMs * 0.25).toInt().clamp(1, 7500),
);
return exponentialMs + jitterMs;
}

View File

@@ -0,0 +1,152 @@
import 'dart:async';
import 'package:networks_sdk/src/presentation/wiring/network_callbacks.dart';
/// Token 刷新管理器
///
/// 两种刷新模式:
///
/// 1. **被动刷新**[refreshIfNeeded])— 拦截器检测到 token 过期后调用
/// 2. **主动刷新**[proactivelyRefreshIfNeeded])— 解析 JWT exp
/// 距过期不足阈值时提前刷新,避免带过期 token 发请求
///
/// 两种模式共享串行锁和时间窗口保护:
/// - **串行锁** — 同一时刻只执行一次刷新,其余请求等待同一 Completer
/// - **时间窗口** — 刷新成功后 [reuseWindow] 内再次调用直接返回缓存 token
/// - **超时保护** — 刷新回调超过 [timeout] 自动失败,防止死锁
class TokenRefreshManager {
final OnTokenRefresh? onTokenRefresh;
final OnLog? onLog;
/// 刷新超时时间(防止 onTokenRefresh 卡住导致所有请求阻塞)
final Duration timeout;
/// 时间窗口:刷新成功后此时间内再次调用直接返回缓存 token
final Duration reuseWindow;
/// Token 过期时间解析App 层注入 JWT exp 解析逻辑)
final OnGetTokenExpiry? onGetTokenExpiry;
/// 主动刷新阈值:距过期不足此时间时提前刷新(默认 1 小时)
final Duration proactiveRefreshThreshold;
/// 当前正在进行的刷新任务null = 空闲)
Completer<String?>? _completer;
/// 上次刷新成功的时间戳
DateTime? _lastRefreshTime;
/// 上次刷新成功的 token时间窗口内复用
String? _lastToken;
TokenRefreshManager({
this.onTokenRefresh,
this.onLog,
this.timeout = const Duration(seconds: 10),
this.reuseWindow = const Duration(seconds: 3),
this.onGetTokenExpiry,
this.proactiveRefreshThreshold = const Duration(hours: 1),
});
/// 执行 token 刷新(如果需要)
///
/// 返回新 token刷新成功或在时间窗口内
/// 返回 null = 刷新失败或超时。
Future<String?> refreshIfNeeded() async {
// 1. 时间窗口:最近刷新过且未超时 → 直接返回缓存的 token
if (_isWithinReuseWindow()) {
_log('Token refreshed recently, reusing');
return _lastToken;
}
// 2. 有正在进行的刷新 → 等待同一 Completer
final existing = _completer;
if (existing != null) {
_log('Waiting for ongoing token refresh');
return existing.future;
}
// 3. 发起新的刷新
if (onTokenRefresh == null) {
_log('No onTokenRefresh callback configured');
return null;
}
final completer = Completer<String?>();
_completer = completer;
try {
final newToken = await onTokenRefresh!().timeout(
timeout,
onTimeout: () {
_log('Token refresh timed out after ${timeout.inSeconds}s');
return null;
},
);
final success = newToken != null && newToken.isNotEmpty;
if (success) {
_lastRefreshTime = DateTime.now();
_lastToken = newToken;
_log('Token refreshed successfully');
} else {
_log('Token refresh failed (null or empty token)');
}
// 先 complete 再清引用,确保等待者能拿到结果
completer.complete(success ? newToken : null);
return success ? newToken : null;
} catch (e) {
_log('Token refresh error: $e');
completer.complete(null);
return null;
} finally {
// 清理引用Completer 已 complete等待者不受影响
_completer = null;
}
}
/// 检查 token 是否即将过期,是则主动刷新
///
/// 解析 [currentToken] 的过期时间,距过期不足 [proactiveRefreshThreshold]
/// 时调用 [refreshIfNeeded] 刷新。复用串行锁和超时保护。
///
/// 返回新 token已刷新或 null不需要刷新 / 刷新失败 / 无法解析过期时间)。
Future<String?> proactivelyRefreshIfNeeded(String? currentToken) async {
if (currentToken == null || onGetTokenExpiry == null) return null;
final expiry = onGetTokenExpiry!(currentToken);
if (expiry == null) return null;
final remaining = expiry.difference(DateTime.now());
if (remaining > proactiveRefreshThreshold) {
_log(
'Token valid (expires in ${remaining.inMinutes}min), skip proactive refresh',
);
return null;
}
_log(
'Token expiring soon (${remaining.inMinutes}min left), proactively refreshing',
);
return refreshIfNeeded();
}
/// 重置状态(登出时调用)
void reset() {
_lastRefreshTime = null;
_lastToken = null;
// 不清理 _completer让正在等待的请求正常结束
}
bool _isWithinReuseWindow() {
final lastTime = _lastRefreshTime;
if (lastTime == null) return false;
return DateTime.now().difference(lastTime) < reuseWindow;
}
void _log(String message) {
onLog?.call(message, tag: 'TokenRefresh');
}
}

View File

@@ -1,14 +1,25 @@
import 'dart:io';
import 'package:dio/dio.dart';
import 'package:networks_sdk/src/data/datasources/http/api_client.dart';
import 'package:networks_sdk/src/data/dto/api_requestable.dart';
import 'package:networks_sdk/src/domain/entities/api_error.dart';
import 'package:networks_sdk/src/domain/entities/api_request_type.dart';
import 'package:networks_sdk/src/presentation/wiring/api_config.dart';
import 'package:networks_sdk/src/presentation/wiring/network_callbacks.dart';
import '../../../networks_sdk_platform_interface.dart';
import '../../domain/entities/http_method.dart';
class NetworksSdkMethodChannelDataSource
{
/// 网络层数据源
///
/// 封装 [ApiClient],提供两种请求入口:
/// - [executeRequest] — 统一请求入口(标准 / Upload / 流式)
/// - [executeDownload] — 带进度的文件下载(支持断点续传)
///
/// 流式SSE请求也走 [executeRequest],由业务 Request 类 override
/// `decodeResponse` 处理 SSE 解析。SDK 内部根据
/// `requestType == ApiRequestType.stream` 自动切换 `ResponseType.plain`。
class NetworksSdkMethodChannelDataSource {
final NetworksSdkPlatform platform;
late ApiClient apiClient;
@@ -16,44 +27,51 @@ class NetworksSdkMethodChannelDataSource
NetworksSdkMethodChannelDataSource(this.platform);
Future<String?> getPlatformVersion() async {
return await getPlatformVersion();
return await platform.getPlatformVersion();
}
void initialize(ApiConfig apiConfig){
void initialize(ApiConfig apiConfig) {
apiClient = ApiClient(config: apiConfig);
}
/// 执行 API 请求 — 唯一入口
// ══════════════════════════════════════════════════════════════════════════
// 统一请求入口
// ══════════════════════════════════════════════════════════════════════════
/// 执行 API 请求 — 统一入口
///
/// 流程:网络前置检查 → 构建 URL → 设置元数据 → 执行请求 → 解码响应 → 错误映射
/// 拦截器负责header 注入、Token 刷新重试、日志
/// 支持三种请求类型,由 `request.requestType` 控制行为:
/// - `request` / `login` — 标准 JSON 请求
/// - `upload` — 文件上传FormData / 二进制)
/// - `stream` — SSE / chunked内部用 `ResponseType.plain` 获取原始文本,
/// 由业务 Request 类 override `decodeResponse` 处理 SSE 解析
///
/// 流程:网络前置检查 → 构建 URL → 设置元数据 → 执行请求
/// → 响应变换可选stream 类型跳过)→ 解码响应 → 错误映射
///
/// 拦截器负责header 注入、加密/解密、Token 刷新重试、业务错误拦截、日志
///
/// Upload 类型支持两种模式:
/// - 自有后端上传path 为相对路径,自动拼接 baseURL
/// - S3 presigned URLpath 以 http 开头,直接使用全路径
Future<T?> executeRequest<T>(ApiRequestable<T> request) async {
// 前置检查:网络不可用时直接抛错,避免无效请求
if (apiClient.config.onCheckNetworkAvailable != null) {
final available = await apiClient.config.onCheckNetworkAvailable!();
if (!available) {
apiClient.config.onLog?.call(
'Network unavailable, abort request: ${request.path}',
tag: 'ApiClient',
);
throw const ApiError.noNetworkConnection();
}
}
Future<T?> executeRequest<T>(
ApiRequestable<T> request, {
CancelToken? cancelToken,
}) async {
await _checkNetwork(request.path);
try {
// Upload 且 path 以 http 开头 → 直接用全路径S3 presigned URL
// 否则 → 拼接 baseURL
final isUpload = request.requestType == ApiRequestType.upload;
final isStream = request.requestType == ApiRequestType.stream;
final path = request.path;
final url = (isUpload && path.startsWith('http')) ? path : '${apiClient.config.baseURL}$path';
final url = (isUpload && path.startsWith('http'))
? path
: '${apiClient.config.baseURL}$path';
// 将请求元数据写入 extra供拦截器读取
final options = Options(
method: request.method.value,
// 流式请求用 plainDio 返回原始文本,由 decodeResponse 解析 SSE
responseType: isStream ? ResponseType.plain : null,
extra: {
'requestType': request.requestType,
'includeToken': request.includeToken,
@@ -62,19 +80,22 @@ class NetworksSdkMethodChannelDataSource
);
// 访问 parameters 触发代码生成器的 fromJson 注册
// @ApiRequest 生成的 mixin 在 parameters getter 中注册响应类型)
final params = request.parameters;
// GET → queryParametersPOST/PUT/DELETE/PATCH → JSON bodyUpload → uploadData
final isGet = request.method == HttpMethod.get;
final response = await apiClient.dio.request(
url,
data: isUpload ? request.uploadData : (isGet ? null : params),
queryParameters: isGet ? params : null,
options: options,
cancelToken: cancelToken,
);
// 解码响应Upload 类型通常需要 override decodeResponse
// 响应变换stream 类型由 decodeResponse 自行处理,不做变换
if (!isStream) {
_applyResponseTransform(response);
}
return request.decodeResponse(response);
} on DioException catch (e) {
throw apiClient.mapDioError(e);
@@ -85,4 +106,162 @@ class NetworksSdkMethodChannelDataSource
}
}
// ══════════════════════════════════════════════════════════════════════════
// 文件下载
// ══════════════════════════════════════════════════════════════════════════
/// 下载文件到本地路径
///
/// 支持进度回调和断点续传(通过 HTTP Range header 实现)。
///
/// 非续传模式直接用 Dio.download高效内部流式写入
/// 续传模式用 stream + FileMode.append因为 Dio.download 始终从
/// 文件头部写入,无法正确追加到已下载部分之后。
///
/// [url] — 下载 URL完整路径或相对路径相对路径自动拼接 baseURL
/// [savePath] — 本地保存路径
/// [onProgress] — 下载进度回调
/// [cancelToken] — 取消令牌
/// [resume] — 是否断点续传(文件已存在时从断点继续下载)
/// [headers] — 额外请求头
Future<void> executeDownload({
required String url,
required String savePath,
OnDownloadProgress? onProgress,
CancelToken? cancelToken,
bool resume = false,
Map<String, String>? headers,
}) async {
await _checkNetwork(url);
try {
final fullUrl = url.startsWith('http')
? url
: '${apiClient.config.baseURL}$url';
final extraHeaders = <String, String>{};
if (headers != null) extraHeaders.addAll(headers);
// 断点续传:读取已下载部分的大小,设置 Range header
int startBytes = 0;
if (resume) {
final file = File(savePath);
if (file.existsSync()) {
startBytes = file.lengthSync();
extraHeaders['Range'] = 'bytes=$startBytes-';
}
}
if (resume && startBytes > 0) {
// 续传模式stream + append确保新数据追加到文件末尾
await _downloadWithResume(
url: fullUrl,
savePath: savePath,
startBytes: startBytes,
headers: extraHeaders,
onProgress: onProgress,
cancelToken: cancelToken,
);
} else {
// 普通下载Dio.download高效内部流式写入
await apiClient.dio.download(
fullUrl,
savePath,
cancelToken: cancelToken,
deleteOnError: true,
options: Options(
headers: extraHeaders.isEmpty ? null : extraHeaders,
extra: {
'requestType': ApiRequestType.download,
'includeToken': true,
},
),
onReceiveProgress: onProgress != null
? (received, total) => onProgress(received, total)
: null,
);
}
} on DioException catch (e) {
throw apiClient.mapDioError(e);
} on ApiError {
rethrow;
} catch (e) {
throw ApiError.unknown(e.toString());
}
}
/// 断点续传下载stream 响应 + FileMode.append
///
/// Dio.download 内部用 FileMode.write从头覆盖无法正确续传。
/// 这里手动读流并追加写入文件。
Future<void> _downloadWithResume({
required String url,
required String savePath,
required int startBytes,
required Map<String, String> headers,
OnDownloadProgress? onProgress,
CancelToken? cancelToken,
}) async {
final response = await apiClient.dio.get<ResponseBody>(
url,
cancelToken: cancelToken,
options: Options(
responseType: ResponseType.stream,
headers: headers.isEmpty ? null : headers,
extra: {'requestType': ApiRequestType.download, 'includeToken': true},
),
);
final stream = response.data?.stream;
if (stream == null) return;
// Content-Length 是本次传输量(不含已下载部分)
final contentLength =
int.tryParse(response.headers.value('content-length') ?? '') ?? -1;
final totalBytes = contentLength > 0 ? contentLength + startBytes : -1;
final file = File(savePath);
final raf = file.openSync(mode: FileMode.writeOnlyAppend);
int received = startBytes;
try {
await for (final chunk in stream) {
raf.writeFromSync(chunk);
received += chunk.length;
onProgress?.call(received, totalBytes);
}
} finally {
raf.closeSync();
}
}
// ══════════════════════════════════════════════════════════════════════════
// 内部辅助
// ══════════════════════════════════════════════════════════════════════════
/// 网络前置检查,不可用时直接抛 [ApiError.noNetworkConnection]
Future<void> _checkNetwork(String path) async {
if (apiClient.config.onCheckNetworkAvailable != null) {
final available = await apiClient.config.onCheckNetworkAvailable!();
if (!available) {
apiClient.config.onLog?.call(
'Network unavailable, abort request: $path',
tag: 'ApiClient',
);
throw const ApiError.noNetworkConnection();
}
}
}
/// 应用响应变换(如果 App 层注入了 onTransformResponse
void _applyResponseTransform(Response response) {
final transform = apiClient.config.onTransformResponse;
if (transform == null) return;
if (response.data is! Map<String, dynamic>) return;
final transformed = transform(response.data as Map<String, dynamic>);
if (transformed != null) {
response.data = transformed;
}
}
}

View File

@@ -1,6 +1,8 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io' as io;
import 'dart:math';
import 'dart:typed_data';
import 'package:networks_sdk/networks_sdk.dart';
import 'package:web_socket_channel/io.dart';
@@ -9,10 +11,12 @@ import 'package:web_socket_channel/web_socket_channel.dart';
/// WebSocket 长连接客户端
///
/// 提供:
/// - 连接 / 断连 / 自动重连(指数退避)
/// - 连接 / 断连 / 自动重连(指数退避,支持无限重连
/// - 双层心跳(底层 ping + 应用层 heartbeat
/// - Stream 输出(消息、连接状态、错误)
/// - Stream 输出(JSON 消息、原始字符串、二进制、连接状态、错误)
/// - 生命周期感知(前后台切换)
/// - Token 热更新(不断连)
/// - 消息加密/解密钩子(预留给 cipher_guard_sdk
///
/// ## 使用方式
///
@@ -28,6 +32,9 @@ import 'package:web_socket_channel/web_socket_channel.dart';
/// // 发消息
/// await client.send({'type': 'chat', 'data': {...}});
///
/// // Token 热更新(不断连,下次重连自动使用新 token
/// client.updateToken('new_token');
///
/// // 断连
/// await client.disconnect();
/// ```
@@ -56,8 +63,9 @@ class SocketClient {
// ── Stream Controllers ──
final _messageController = StreamController<Map<String, dynamic>>.broadcast();
final _rawMessageController = StreamController<String>.broadcast();
final _binaryMessageController = StreamController<Uint8List>.broadcast();
final _connectionStateController =
StreamController<SocketConnectionState>.broadcast();
StreamController<SocketConnectionState>.broadcast();
final _errorController = StreamController<SocketError>.broadcast();
SocketClient({required this.config});
@@ -93,12 +101,20 @@ class SocketClient {
}
/// 当前是否已连接
bool get isConnected =>
_connectionState == SocketConnectionState.connected;
bool get isConnected => _connectionState == SocketConnectionState.connected;
/// 当前连接状态
SocketConnectionState get connectionState => _connectionState;
/// Token 热更新(不断开连接)
///
/// 仅更新内部持有的 token下次重连时自动使用新 token。
/// 适用于 token 刷新后同步到 WebSocket 的场景。
void updateToken(String token) {
_currentToken = token;
_log('Token updated (no reconnect)');
}
// ══════════════════════════════════════════════════════════════════════════
// 公开 API — 发送
// ══════════════════════════════════════════════════════════════════════════
@@ -109,6 +125,8 @@ class SocketClient {
}
/// 发送原始字符串
///
/// 如果配置了 [SocketConfig.onEncryptMessage],发送前自动加密。
Future<bool> sendString(String message) async {
if (!isConnected || _channel == null) {
_emitError(SocketError.sendFailed('Not connected'));
@@ -116,7 +134,27 @@ class SocketClient {
}
try {
_channel!.sink.add(message);
String payload = message;
if (config.onEncryptMessage != null) {
payload = await config.onEncryptMessage!(message);
}
_channel!.sink.add(payload);
return true;
} catch (e) {
_emitError(SocketError.sendFailed(e.toString()));
return false;
}
}
/// 发送二进制数据
Future<bool> sendBytes(List<int> bytes) async {
if (!isConnected || _channel == null) {
_emitError(SocketError.sendFailed('Not connected'));
return false;
}
try {
_channel!.sink.add(bytes);
return true;
} catch (e) {
_emitError(SocketError.sendFailed(e.toString()));
@@ -134,6 +172,9 @@ class SocketClient {
/// 原始字符串消息流JSON 解析失败的也走这里)
Stream<String> get rawMessageStream => _rawMessageController.stream;
/// 二进制消息流
Stream<Uint8List> get binaryMessageStream => _binaryMessageController.stream;
/// 连接状态变化流
Stream<SocketConnectionState> get connectionStateStream =>
_connectionStateController.stream;
@@ -171,6 +212,7 @@ class SocketClient {
await _doDisconnect(reason: 'Dispose');
await _messageController.close();
await _rawMessageController.close();
await _binaryMessageController.close();
await _connectionStateController.close();
await _errorController.close();
}
@@ -197,24 +239,36 @@ class SocketClient {
_log('Connecting to $url');
try {
// 构建最终 URL(拼接 token
final connectUri = _currentToken != null
? uri.replace(
queryParameters: {
...uri.queryParameters,
'token': _currentToken!,
},
)
: uri;
// 构建最终连接 URL
//
// 有 onBuildConnectUrl 回调时App 层完全控制 URL路径加密、
// token 加密、添加 cipher 参数等)。
// 无回调时使用默认行为URL 后追加 ?token=xxx。
final String connectUrlString;
// 创建 WebSocket 连接
_channel = IOWebSocketChannel.connect(
connectUri,
pingInterval: config.pingInterval,
);
if (config.onBuildConnectUrl != null) {
connectUrlString = config.onBuildConnectUrl!(url, _currentToken);
} else {
final connectUri = _currentToken != null
? uri.replace(
queryParameters: {
...uri.queryParameters,
'token': _currentToken!,
},
)
: uri;
connectUrlString = connectUri.toString();
}
// 等待连接就绪
await _channel!.ready.timeout(config.connectTimeout);
// 创建 WebSocket 连接(通过 dart:io 支持压缩选项)
final rawSocket = await io.WebSocket.connect(
connectUrlString,
compression: config.enableCompression
? io.CompressionOptions.compressionDefault
: io.CompressionOptions.compressionOff,
).timeout(config.connectTimeout);
rawSocket.pingInterval = config.pingInterval;
_channel = IOWebSocketChannel(rawSocket);
_log('Connected');
_updateConnectionState(SocketConnectionState.connected);
@@ -270,26 +324,45 @@ class SocketClient {
// 内部 — 消息处理
// ══════════════════════════════════════════════════════════════════════════
void _handleMessage(dynamic data) {
void _handleMessage(dynamic data) async {
// 二进制消息
if (data is List<int>) {
_binaryMessageController.add(
data is Uint8List ? data : Uint8List.fromList(data),
);
return;
}
if (data is! String) {
// 非字符串消息(如二进制),走 rawMessageStream
_rawMessageController.add(data.toString());
return;
}
// 检查 pong 心跳回复
if (data == 'pong') {
// 解密(如果配置了解密回调)
String text = data;
if (config.onDecryptMessage != null) {
try {
text = await config.onDecryptMessage!(data);
} catch (e) {
_log('Message decryption failed: $e');
_rawMessageController.add(data);
return;
}
}
// 检查 pong 心跳回复(解密后检查,加密场景下也能正确匹配)
if (text == 'pong') {
_onPongReceived();
return;
}
// 尝试 JSON 解析
try {
final json = jsonDecode(data) as Map<String, dynamic>;
final json = jsonDecode(text) as Map<String, dynamic>;
_messageController.add(json);
} catch (_) {
// JSON 解析失败,走原始消息流
_rawMessageController.add(data);
_rawMessageController.add(text);
}
}
@@ -328,11 +401,21 @@ class SocketClient {
_waitingForPong = false;
}
void _sendPing() {
void _sendPing() async {
if (_waitingForPong) return;
_waitingForPong = true;
_channel?.sink.add('ping');
// 加密场景下 ping 也要加密,与 pong 解密对称
String pingPayload = 'ping';
if (config.onEncryptMessage != null) {
try {
pingPayload = await config.onEncryptMessage!('ping');
} catch (e) {
_log('Ping encryption failed: $e');
}
}
_channel?.sink.add(pingPayload);
// 启动 pong 超时计时器
_pongTimeoutTimer = Timer(config.pongTimeout, () {
@@ -360,7 +443,9 @@ class SocketClient {
if (_manualDisconnect || !config.autoReconnect || _isBackground) return;
if (_connectionState == SocketConnectionState.reconnecting) return;
if (_reconnectAttempts >= config.maxReconnectAttempts) {
// 非无限重连模式下检查重连次数上限
if (!config.unlimitedReconnect &&
_reconnectAttempts >= config.maxReconnectAttempts) {
_log('Max reconnect attempts reached ($_reconnectAttempts)');
_reconnectAttempts = 0;
return;
@@ -375,11 +460,16 @@ class SocketClient {
pow(2, _reconnectAttempts).toInt() * 1000,
config.maxReconnectDelay.inMilliseconds,
);
final jitterMs = _random.nextInt((baseDelayMs * 0.25).toInt().clamp(1, 7500));
final jitterMs = _random.nextInt(
(baseDelayMs * 0.25).toInt().clamp(1, 7500),
);
final delay = Duration(milliseconds: baseDelayMs + jitterMs);
_log('Reconnecting in ${delay.inMilliseconds}ms '
'(attempt $_reconnectAttempts/${config.maxReconnectAttempts})');
final attemptsInfo = config.unlimitedReconnect
? 'attempt $_reconnectAttempts/unlimited'
: 'attempt $_reconnectAttempts/${config.maxReconnectAttempts}';
_log('Reconnecting in ${delay.inMilliseconds}ms ($attemptsInfo)');
_reconnectTimer = Timer(delay, () async {
// 重连前检查网络
@@ -393,6 +483,17 @@ class SocketClient {
}
}
// 重连前回调App 层刷新即将过期的 token 等)
if (config.onBeforeReconnect != null) {
try {
await config.onBeforeReconnect!();
} catch (e) {
_log('onBeforeReconnect failed: $e, skip this reconnect');
_startReconnect();
return;
}
}
_doConnect();
});
}