diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index a0cc682e9f..0000000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.gitignore b/.gitignore index 49330ee16f..ae493b8dad 100644 --- a/.gitignore +++ b/.gitignore @@ -51,4 +51,5 @@ rebel.xml application-my.yaml /yudao-ui-app/unpackage/ +.DS_Store **/.DS_Store diff --git a/script/livekit-poc/README.md b/script/livekit-poc/README.md new file mode 100644 index 0000000000..104cdb2654 --- /dev/null +++ b/script/livekit-poc/README.md @@ -0,0 +1,32 @@ +# LiveKit Server PoC + +最小可用的 LiveKit Server 自部署验证环境,用于零期 PoC。 + +## 启动 + +```bash +cd tools/livekit-poc +docker compose up -d +bash verify.sh +``` + +## 端口 + +- 7880:HTTP / WebSocket 信令; +- 7881:WebRTC TCP fallback; +- 7882/UDP:WebRTC 媒体; +- macOS / Windows:当前 `docker-compose.yml` 走端口映射模式,webhook URL 用 `host.docker.internal:48080` 让容器访问到宿主机 yudao 后端; +- macOS 上 host network(`network_mode: host`)需要 Docker Desktop 4.34+ 并在 Settings → Resources → Network 勾选「Enable host networking」,老版本静默失败(容器跑得起来但端口完全不通); +- Linux:可以把 `docker-compose.yml` 改成 `network_mode: host` + 删 `ports:` 段,并把 `livekit.yaml` 的 webhook URL 改为 `http://127.0.0.1:48080/admin-api/im/livekit/webhook`。 + +## 凭据 (仅 PoC,勿用于生产) + +- `LIVEKIT_KEYS=devkey: secret-poc-key-min-32-chars-required-here` +- API Key:`devkey` +- API Secret:`secret-poc-key-min-32-chars-required-here` + +生产环境必须改用强随机 secret,并通过 `--config /etc/livekit.yaml` 加载。 + +## 浏览器联调 + +`verify.sh` 跑完会输出一个 `meet.livekit.io` 链接,用两个浏览器(或两台机器)打开同一链接即可看到对方画面。 diff --git a/script/livekit-poc/docker-compose.yml b/script/livekit-poc/docker-compose.yml new file mode 100644 index 0000000000..893daf2d38 --- /dev/null +++ b/script/livekit-poc/docker-compose.yml @@ -0,0 +1,20 @@ +services: + livekit: + image: docker.m.daocloud.io/livekit/livekit-server:latest + container_name: yudao-livekit-dev + restart: unless-stopped + # 端口映射模式 + # macOS / Windows 必走这种方式:Docker Desktop 4.34 以下没有 host network + # Linux 可以改 network_mode: host 省去映射,并把 livekit.yaml 的 webhook url 换成 127.0.0.1 + ports: + - "7880:7880" # HTTP / WebSocket 信令 + - "7881:7881" # WebRTC TCP fallback + - "7882:7882/udp" # WebRTC UDP (dev 模式 UDP mux 单端口) + volumes: + # 挂载 config 文件;webhook 配置在 livekit.yaml 里 + - ./livekit.yaml:/etc/livekit.yaml:ro + command: + - --config + - /etc/livekit.yaml + - --bind + - 0.0.0.0 diff --git a/script/livekit-poc/livekit.yaml b/script/livekit-poc/livekit.yaml new file mode 100644 index 0000000000..0dd255ebf2 --- /dev/null +++ b/script/livekit-poc/livekit.yaml @@ -0,0 +1,30 @@ +# LiveKit Server 本地开发配置(PoC 用,勿用于生产) +# 替代 docker --dev 模式;为支持 webhook 必须用 config 文件而非 env + +keys: + devkey: secret-poc-key-min-32-chars-required-here + +# 端口 +port: 7880 +rtc: + tcp_port: 7881 + udp_port: 7882 + use_external_ip: false + +# Webhook:成员离开 / 房间结束等事件回调到 yudao 后端做业务态兜底清理 +# host.docker.internal 让容器访问宿主机 macOS / Windows 上的 yudao 后端 +# Linux 上 docker compose 可改 network_mode: host,这里同步改成 127.0.0.1 +# api_key 用于签发 JWT,yudao 后端用相同 secret 验证签名 +webhook: + api_key: devkey + urls: + - http://host.docker.internal:48080/admin-api/im/livekit/webhook + +# 房间无人时多久销毁;秒;之前 PoC 默认 300,给低些方便排查 +room: + empty_timeout: 300 + departure_timeout: 20 + +# 开发模式:放宽 secret 长度限制 + 内置 TURN 服务 +development: true +log_level: info diff --git a/script/livekit-poc/verify.sh b/script/livekit-poc/verify.sh new file mode 100755 index 0000000000..9cf9a152ca --- /dev/null +++ b/script/livekit-poc/verify.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# LiveKit Server PoC 验证脚本; +# 用法: bash verify.sh +set -e + +API_KEY="${LIVEKIT_API_KEY:-devkey}" +API_SECRET="${LIVEKIT_API_SECRET:-secret-poc-key-min-32-chars-required-here}" +HOST="${LIVEKIT_HOST:-localhost:7880}" +ROOM="${LIVEKIT_ROOM:-poc-room}" + +ok() { printf "[OK] %s\n" "$1"; } +fail() { printf "[FAIL] %s\n" "$1"; exit 1; } + +echo "==> 1/5 等待 HTTP 端点就绪 (http://${HOST}/)" +for i in $(seq 1 20); do + code=$(curl -s -o /dev/null -w "%{http_code}" "http://${HOST}/" || echo "000") + [ "$code" = "200" ] && { ok "HTTP 200"; break; } + [ $i -eq 20 ] && fail "20 秒内未就绪 (last code=${code})" + sleep 1 +done + +echo "==> 2/5 签发管理 + 客户端权限 Token" +TOKEN=$(API_KEY="$API_KEY" API_SECRET="$API_SECRET" ROOM="$ROOM" python3 - <<'PY' +import json, time, hmac, hashlib, base64, os +def b64u(b): return base64.urlsafe_b64encode(b).rstrip(b'=').decode() +header = b64u(json.dumps({"alg":"HS256","typ":"JWT"}, separators=(',',':')).encode()) +payload = b64u(json.dumps({ + "iss": os.environ["API_KEY"], + "sub": "poc-tester", + "name": "PoC Tester", + "video": { + "roomJoin": True, "room": os.environ["ROOM"], + "canPublish": True, "canSubscribe": True, "canPublishData": True, + "roomCreate": True, "roomList": True, "roomAdmin": True + }, + "exp": int(time.time()) + 3600, + "nbf": int(time.time()) +}, separators=(',',':')).encode()) +sig = b64u(hmac.new(os.environ["API_SECRET"].encode(), + f"{header}.{payload}".encode(), + hashlib.sha256).digest()) +print(f"{header}.{payload}.{sig}") +PY +) +[ -n "$TOKEN" ] || fail "Token 生成失败" +ok "Token 已生成 (${#TOKEN} chars)" + +echo "==> 3/5 创建房间 ${ROOM} (CreateRoom RPC)" +create_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/CreateRoom" \ + -H "Authorization: Bearer ${TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"name\":\"${ROOM}\",\"empty_timeout\":300,\"max_participants\":10}") +echo " 响应: $create_resp" +echo "$create_resp" | jq -e '.sid' >/dev/null 2>&1 \ + && ok "房间已创建" \ + || fail "CreateRoom 失败" + +echo "==> 4/5 列出房间 (ListRooms RPC)" +list_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/ListRooms" \ + -H "Authorization: Bearer ${TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{}') +room_count=$(echo "$list_resp" | jq '.rooms | length' 2>/dev/null || echo "0") +ok "当前房间数: ${room_count}" +echo "$list_resp" | jq '.' + +echo "==> 5/5 删除房间 (DeleteRoom RPC) —— 清理" +del_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/DeleteRoom" \ + -H "Authorization: Bearer ${TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"room\":\"${ROOM}\"}") +ok "删除响应: $del_resp" + +# 重新签一个仅 client 权限的 token;用于浏览器进会 +CLIENT_TOKEN=$(API_KEY="$API_KEY" API_SECRET="$API_SECRET" ROOM="$ROOM" python3 - <<'PY' +import json, time, hmac, hashlib, base64, os +def b64u(b): return base64.urlsafe_b64encode(b).rstrip(b'=').decode() +header = b64u(json.dumps({"alg":"HS256","typ":"JWT"}, separators=(',',':')).encode()) +payload = b64u(json.dumps({ + "iss": os.environ["API_KEY"], + "sub": "browser-tester", + "name": "Browser", + "video": { + "roomJoin": True, "room": os.environ["ROOM"], + "canPublish": True, "canSubscribe": True, "canPublishData": True + }, + "exp": int(time.time()) + 7200 +}, separators=(',',':')).encode()) +sig = b64u(hmac.new(os.environ["API_SECRET"].encode(), + f"{header}.{payload}".encode(), + hashlib.sha256).digest()) +print(f"{header}.{payload}.{sig}") +PY +) + +echo "" +echo "============================================================" +echo " LiveKit Server 验证通过" +echo "============================================================" +echo " 浏览器测试 (开两个窗口能互通):" +echo " https://meet.livekit.io/?liveKitUrl=ws%3A%2F%2F${HOST}&token=${CLIENT_TOKEN}" +echo "" +echo " 停止服务:" +echo " docker compose -f tools/livekit-poc/docker-compose.yml down" +echo "============================================================" diff --git a/yudao-module-im/pom.xml b/yudao-module-im/pom.xml index c64c05eed2..6fdb223c56 100644 --- a/yudao-module-im/pom.xml +++ b/yudao-module-im/pom.xml @@ -23,6 +23,11 @@ yudao-module-system ${revision} + + cn.iocoder.boot + yudao-module-infra + ${revision} + cn.iocoder.boot @@ -40,11 +45,6 @@ yudao-spring-boot-starter-security - - cn.iocoder.boot - yudao-spring-boot-starter-websocket - - cn.iocoder.boot diff --git a/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImpl.java b/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImpl.java index c790c609a1..a7a1934c49 100644 --- a/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImpl.java +++ b/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImpl.java @@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.im.service.websocket; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; -import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; import cn.iocoder.yudao.module.im.service.websocket.dto.ImChannelMessageDTO; import cn.iocoder.yudao.module.im.service.websocket.dto.ImGroupMessageDTO; import cn.iocoder.yudao.module.im.service.websocket.dto.ImPrivateMessageDTO; +import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -33,7 +33,7 @@ import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils. public class ImWebSocketServiceImpl implements ImWebSocketService { @Resource - private WebSocketMessageSender webSocketMessageSender; + private WebSocketSenderApi webSocketSenderApi; @Override public void sendPrivateMessageAsync(Collection userIds, ImPrivateMessageDTO dto) { @@ -64,7 +64,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService { public void doSendPrivateMessage(Collection userIds, ImPrivateMessageDTO dto) { for (Long userId : getDistinctUserIds(userIds)) { try { - webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId, + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId, ImPrivateMessageDTO.TYPE, dto); } catch (Exception e) { log.error("[doSendPrivateMessage][userId({}) dto({}) 发送失败]", userId, dto, e); @@ -79,7 +79,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService { public void doSendGroupMessage(Collection userIds, ImGroupMessageDTO dto) { for (Long userId : getDistinctUserIds(userIds)) { try { - webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId, + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId, ImGroupMessageDTO.TYPE, dto); } catch (Exception e) { log.error("[doSendGroupMessage][userId({}) dto({}) 发送失败]", userId, dto, e); @@ -94,7 +94,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService { public void doSendChannelMessage(Collection userIds, ImChannelMessageDTO dto) { for (Long userId : getDistinctUserIds(userIds)) { try { - webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId, + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId, ImChannelMessageDTO.TYPE, dto); } catch (Exception e) { log.error("[doSendChannelMessage][userId({}) dto({}) 发送失败]", userId, dto, e); @@ -104,12 +104,12 @@ public class ImWebSocketServiceImpl implements ImWebSocketService { /** * 异步广播频道 WebSocket 消息给当前所有在线管理端用户; - * 依赖 WebSocketMessageSender 按 UserType 广播能力,离线用户由客户端上线 pull 兜底 + * 依赖 infra WebSocketSenderApi 按 UserType 广播能力,离线用户由客户端上线 pull 兜底 */ @Async public void doBroadcastChannelMessage(ImChannelMessageDTO dto) { try { - webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), ImChannelMessageDTO.TYPE, dto); } catch (Exception e) { log.error("[doBroadcastChannelMessage][dto({}) 广播失败]", dto, e); diff --git a/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/util/ImMessageUtils.java b/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/util/ImMessageUtils.java index 255a1e95e2..1ff958a52f 100644 --- a/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/util/ImMessageUtils.java +++ b/yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/util/ImMessageUtils.java @@ -87,6 +87,7 @@ public class ImMessageUtils { validateNotBlank(getString(map, "name")); } + @SuppressWarnings("PatternVariableCanBeUsed") private static void validateMergeContent(Map map) { validateNotBlank(getString(map, "title")); Object messages = map.get("messages"); diff --git a/yudao-module-im/src/test/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImplTest.java b/yudao-module-im/src/test/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImplTest.java index ba11735cef..df2125a7be 100644 --- a/yudao-module-im/src/test/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImplTest.java +++ b/yudao-module-im/src/test/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImplTest.java @@ -4,9 +4,9 @@ import cn.hutool.core.collection.ListUtil; import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest; -import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; import cn.iocoder.yudao.module.im.service.websocket.dto.ImGroupMessageDTO; import cn.iocoder.yudao.module.im.service.websocket.dto.ImPrivateMessageDTO; +import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; @@ -34,7 +34,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { private ImWebSocketServiceImpl imWebSocketService; @Mock - private WebSocketMessageSender webSocketMessageSender; + private WebSocketSenderApi webSocketSenderApi; @AfterEach public void tearDown() { @@ -59,7 +59,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.sendPrivateMessageAsync(2L, dto); // 断言 - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImPrivateMessageDTO.TYPE), eq(dto)); } } @@ -79,7 +79,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.sendPrivateMessageAsync(2L, dto); // 断言:事务未提交,未推送 - verify(webSocketMessageSender, never()).sendObject(anyInt(), anyLong(), anyString(), any()); + verify(webSocketSenderApi, never()).sendObject(anyInt(), anyLong(), anyString(), any()); // 模拟事务提交 List syncs = @@ -88,7 +88,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { syncs.forEach(TransactionSynchronization::afterCommit); // 断言:提交后推送 - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImPrivateMessageDTO.TYPE), eq(dto)); } finally { TransactionSynchronizationManager.clear(); @@ -110,11 +110,11 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.sendGroupMessageAsync(ListUtil.of(1L, 2L, 3L), dto); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(1L), eq(ImGroupMessageDTO.TYPE), eq(dto)); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImGroupMessageDTO.TYPE), eq(dto)); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(3L), eq(ImGroupMessageDTO.TYPE), eq(dto)); } } @@ -129,13 +129,13 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { dto.setGroupId(10L); // 给 1 号用户推送时抛异常,不能影响 2/3 号 doThrow(new RuntimeException("user offline")) - .when(webSocketMessageSender).sendObject(anyInt(), eq(1L), anyString(), any()); + .when(webSocketSenderApi).sendObject(anyInt(), eq(1L), anyString(), any()); imWebSocketService.sendGroupMessageAsync(ListUtil.of(1L, 2L, 3L), dto); // 2L 和 3L 也都被推送 - verify(webSocketMessageSender).sendObject(anyInt(), eq(2L), anyString(), any()); - verify(webSocketMessageSender).sendObject(anyInt(), eq(3L), anyString(), any()); + verify(webSocketSenderApi).sendObject(anyInt(), eq(2L), anyString(), any()); + verify(webSocketSenderApi).sendObject(anyInt(), eq(3L), anyString(), any()); } } @@ -146,7 +146,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.doSendGroupMessage(Collections.emptyList(), dto); imWebSocketService.doSendGroupMessage(null, dto); - verifyNoInteractions(webSocketMessageSender); + verifyNoInteractions(webSocketSenderApi); } @Test @@ -155,11 +155,11 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.doSendGroupMessage(Arrays.asList(1L, 2L, 1L, null), dto); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(1L), eq(ImGroupMessageDTO.TYPE), eq(dto)); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImGroupMessageDTO.TYPE), eq(dto)); - verifyNoMoreInteractions(webSocketMessageSender); + verifyNoMoreInteractions(webSocketSenderApi); } @Test @@ -171,12 +171,12 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { // 准备:sender 抛异常 ImPrivateMessageDTO dto = new ImPrivateMessageDTO().setSenderId(1L).setReceiverId(2L); doThrow(new RuntimeException("user offline")) - .when(webSocketMessageSender).sendObject(anyInt(), anyLong(), anyString(), any()); + .when(webSocketSenderApi).sendObject(anyInt(), anyLong(), anyString(), any()); // 调用:异常应被吞掉,不向上抛 imWebSocketService.sendPrivateMessageAsync(2L, dto); - verify(webSocketMessageSender).sendObject(anyInt(), eq(2L), anyString(), any()); + verify(webSocketSenderApi).sendObject(anyInt(), eq(2L), anyString(), any()); } } @@ -191,7 +191,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest { imWebSocketService.sendGroupMessageAsync(42L, dto); - verify(webSocketMessageSender).sendObject( + verify(webSocketSenderApi).sendObject( eq(UserTypeEnum.ADMIN.getValue()), eq(42L), eq(ImGroupMessageDTO.TYPE), eq(dto)); } }