feat(chat): 发收消息全量实现 (#25~#28)
- 移除 @riverpod/@freezed 注解依赖,全部改为手写 Provider(无需 build_runner) · LoginState 改为纯 Dart,LoginViewModel/ThemeViewModel/ChatViewModel 改为 Notifier · UserNotifier 改为 FamilyAsyncNotifier<User?,int>,mini_app_provider 改为手写 Provider · 15 个 StreamProvider/StreamProvider.family 从 @riverpod 迁移至手写 - 发送消息(#25) · SendMessageRequest/SendMessageResponse DTO · SendMessageUseCase:乐观写入 DB → HTTP POST → 更新 Chat 摘要 - 接收消息 WS(#26) · WsMessageService:监听 mode2 WS 帧 → HTTP 补拉 → DB 写入 → Chat 更新 · FetchHistoryRequest/FetchHistoryResponse DTO(GET /app/api/chat/history) · FetchHistoryUseCase:拉取 → insertOrReplaceAll - DI 装配(chat_service_providers.dart) · wsMessageServiceProvider、sendMessageUseCaseProvider、fetchHistoryUseCaseProvider - 聊天列表页(#27) · ChatListViewModel(Notifier<void>)+ chat_page.dart 真实会话列表 UI · ListTile:头像首字母、最新消息摘要、未读角标、时间格式化 - 聊天详情页(#28) · ChatDetailViewModel(FamilyNotifier<ChatDetailState,int>)+ chat_detail_page.dart · 消息气泡(自己/他人分左右)、底部输入框、发送状态与错误提示 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
135
apps/im_app/lib/core/services/ws_message_service.dart
Normal file
135
apps/im_app/lib/core/services/ws_message_service.dart
Normal file
@@ -0,0 +1,135 @@
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:networks_sdk/networks_sdk.dart';
|
||||
|
||||
import 'package:im_app/core/services/socket_manager.dart';
|
||||
import 'package:im_app/data/remote/fetch_history_request.dart';
|
||||
import 'package:im_app/domain/repositories/chat_repository.dart';
|
||||
import 'package:im_app/domain/repositories/message_repository.dart';
|
||||
|
||||
/// WS 实时消息接收服务
|
||||
///
|
||||
/// 监听 [SocketManager.messageStream],解析 mode2 协议帧:
|
||||
/// ```json
|
||||
/// {
|
||||
/// "chat": { "r": [{ "id": <chatId>, "msg_idx": N, "typ": N, "last_msg": "..." }] },
|
||||
/// "message_realtime_pb": <bytes>
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// 每帧含 `chat.r` 时,通过 HTTP GET `/app/api/chat/history` 拉取完整消息内容,
|
||||
/// 写入 [MessageRepository],再更新 [ChatRepository] 的 lastMsg。
|
||||
/// UI 层通过 `StreamProvider` 监听 DB 变化自动更新,无需额外信令。
|
||||
///
|
||||
/// ## 生命周期
|
||||
///
|
||||
/// 由 [WsMessageServiceProvider] 管理,登录后随 SocketManager 一同启动,
|
||||
/// 登出时 Provider dispose 触发 [stop]。
|
||||
class WsMessageService {
|
||||
final SocketManager _socketManager;
|
||||
final NetworksSdkApi _apiClient;
|
||||
final MessageRepository _messageRepo;
|
||||
final ChatRepository _chatRepo;
|
||||
|
||||
StreamSubscription<Map<String, dynamic>>? _sub;
|
||||
|
||||
WsMessageService({
|
||||
required SocketManager socketManager,
|
||||
required NetworksSdkApi apiClient,
|
||||
required MessageRepository messageRepo,
|
||||
required ChatRepository chatRepo,
|
||||
}) : _socketManager = socketManager,
|
||||
_apiClient = apiClient,
|
||||
_messageRepo = messageRepo,
|
||||
_chatRepo = chatRepo;
|
||||
|
||||
// ── 生命周期 ──────────────────────────────────────────────────────────────
|
||||
|
||||
void start() {
|
||||
_sub?.cancel();
|
||||
_sub = _socketManager.messageStream.listen(
|
||||
_handleFrame,
|
||||
onError: (e) => debugPrint('[WsMessageService] stream error: $e'),
|
||||
);
|
||||
debugPrint('[WsMessageService] started');
|
||||
}
|
||||
|
||||
void stop() {
|
||||
_sub?.cancel();
|
||||
_sub = null;
|
||||
debugPrint('[WsMessageService] stopped');
|
||||
}
|
||||
|
||||
// ── 帧处理 ────────────────────────────────────────────────────────────────
|
||||
|
||||
Future<void> _handleFrame(Map<String, dynamic> frame) async {
|
||||
try {
|
||||
final chatPayload = frame['chat'] as Map<String, dynamic>?;
|
||||
if (chatPayload == null) return;
|
||||
|
||||
final rList = chatPayload['r'] as List<dynamic>?;
|
||||
if (rList == null || rList.isEmpty) return;
|
||||
|
||||
for (final item in rList) {
|
||||
final entry = item as Map<String, dynamic>?;
|
||||
if (entry == null) continue;
|
||||
|
||||
final chatId = (entry['id'] as num?)?.toInt();
|
||||
final msgIdx = (entry['msg_idx'] as num?)?.toInt();
|
||||
if (chatId == null || msgIdx == null) continue;
|
||||
|
||||
await _fetchAndSaveMessages(chatId: chatId, anchorIdx: msgIdx);
|
||||
await _updateChatMeta(chatId: chatId, entry: entry);
|
||||
}
|
||||
} catch (e, st) {
|
||||
debugPrint('[WsMessageService] _handleFrame error: $e\n$st');
|
||||
}
|
||||
}
|
||||
|
||||
/// 通过 HTTP 拉取消息并写入 DB
|
||||
Future<void> _fetchAndSaveMessages({
|
||||
required int chatId,
|
||||
required int anchorIdx,
|
||||
}) async {
|
||||
try {
|
||||
final response = await _apiClient.executeRequest(
|
||||
FetchHistoryRequest(chatId: chatId, chatIdx: anchorIdx, limit: 20),
|
||||
);
|
||||
if (response == null || response.messages.isEmpty) return;
|
||||
|
||||
final entities = response.messages.map((m) => m.toEntity()).toList();
|
||||
await _messageRepo.insertOrReplaceAll(entities);
|
||||
debugPrint(
|
||||
'[WsMessageService] saved ${entities.length} messages for chat $chatId',
|
||||
);
|
||||
} catch (e) {
|
||||
debugPrint('[WsMessageService] fetch error for chat $chatId: $e');
|
||||
}
|
||||
}
|
||||
|
||||
/// 更新聊天列表中对应 Chat 的 lastMsg / lastTyp / msgIdx
|
||||
Future<void> _updateChatMeta({
|
||||
required int chatId,
|
||||
required Map<String, dynamic> entry,
|
||||
}) async {
|
||||
try {
|
||||
final existing = await _chatRepo.getChat(chatId);
|
||||
if (existing == null) return;
|
||||
|
||||
final lastMsg = entry['last_msg'] as String?;
|
||||
final lastTyp = (entry['typ'] as num?)?.toInt();
|
||||
final msgIdx = (entry['msg_idx'] as num?)?.toInt();
|
||||
|
||||
await _chatRepo.updateChat(
|
||||
existing.copyWith(
|
||||
lastMsg: lastMsg ?? existing.lastMsg,
|
||||
lastTyp: lastTyp ?? existing.lastTyp,
|
||||
msgIdx: msgIdx ?? existing.msgIdx,
|
||||
),
|
||||
);
|
||||
} catch (e) {
|
||||
debugPrint('[WsMessageService] updateChatMeta error: $e');
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user