Merge remote-tracking branch 'origin/master-jdk17'

# Conflicts:
#	.DS_Store
#	yudao-module-im/src/main/java/cn/iocoder/yudao/module/im/service/websocket/ImWebSocketServiceImpl.java
This commit is contained in:
YunaiV
2026-06-01 00:48:35 +08:00
10 changed files with 219 additions and 30 deletions

BIN
.DS_Store vendored

Binary file not shown.

1
.gitignore vendored
View File

@ -51,4 +51,5 @@ rebel.xml
application-my.yaml
/yudao-ui-app/unpackage/
.DS_Store
**/.DS_Store

View File

@ -0,0 +1,32 @@
# LiveKit Server PoC
最小可用的 LiveKit Server 自部署验证环境,用于零期 PoC。
## 启动
```bash
cd tools/livekit-poc
docker compose up -d
bash verify.sh
```
## 端口
- 7880HTTP / WebSocket 信令;
- 7881WebRTC TCP fallback
- 7882/UDPWebRTC 媒体;
- macOS / Windows当前 `docker-compose.yml` 走端口映射模式webhook URL 用 `host.docker.internal:48080` 让容器访问到宿主机 yudao 后端;
- macOS 上 host network`network_mode: host`)需要 Docker Desktop 4.34+ 并在 Settings → Resources → Network 勾选「Enable host networking」老版本静默失败容器跑得起来但端口完全不通
- Linux可以把 `docker-compose.yml` 改成 `network_mode: host` + 删 `ports:` 段,并把 `livekit.yaml` 的 webhook URL 改为 `http://127.0.0.1:48080/admin-api/im/livekit/webhook`
## 凭据 (仅 PoC勿用于生产)
- `LIVEKIT_KEYS=devkey: secret-poc-key-min-32-chars-required-here`
- API Key`devkey`
- API Secret`secret-poc-key-min-32-chars-required-here`
生产环境必须改用强随机 secret并通过 `--config /etc/livekit.yaml` 加载。
## 浏览器联调
`verify.sh` 跑完会输出一个 `meet.livekit.io` 链接,用两个浏览器(或两台机器)打开同一链接即可看到对方画面。

View File

@ -0,0 +1,20 @@
services:
livekit:
image: docker.m.daocloud.io/livekit/livekit-server:latest
container_name: yudao-livekit-dev
restart: unless-stopped
# 端口映射模式
# macOS / Windows 必走这种方式Docker Desktop 4.34 以下没有 host network
# Linux 可以改 network_mode: host 省去映射,并把 livekit.yaml 的 webhook url 换成 127.0.0.1
ports:
- "7880:7880" # HTTP / WebSocket 信令
- "7881:7881" # WebRTC TCP fallback
- "7882:7882/udp" # WebRTC UDP (dev 模式 UDP mux 单端口)
volumes:
# 挂载 config 文件webhook 配置在 livekit.yaml 里
- ./livekit.yaml:/etc/livekit.yaml:ro
command:
- --config
- /etc/livekit.yaml
- --bind
- 0.0.0.0

View File

@ -0,0 +1,30 @@
# LiveKit Server 本地开发配置PoC 用,勿用于生产)
# 替代 docker --dev 模式;为支持 webhook 必须用 config 文件而非 env
keys:
devkey: secret-poc-key-min-32-chars-required-here
# 端口
port: 7880
rtc:
tcp_port: 7881
udp_port: 7882
use_external_ip: false
# Webhook成员离开 / 房间结束等事件回调到 yudao 后端做业务态兜底清理
# host.docker.internal 让容器访问宿主机 macOS / Windows 上的 yudao 后端
# Linux 上 docker compose 可改 network_mode: host这里同步改成 127.0.0.1
# api_key 用于签发 JWTyudao 后端用相同 secret 验证签名
webhook:
api_key: devkey
urls:
- http://host.docker.internal:48080/admin-api/im/livekit/webhook
# 房间无人时多久销毁;秒;之前 PoC 默认 300给低些方便排查
room:
empty_timeout: 300
departure_timeout: 20
# 开发模式:放宽 secret 长度限制 + 内置 TURN 服务
development: true
log_level: info

105
script/livekit-poc/verify.sh Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env bash
# LiveKit Server PoC 验证脚本;
# 用法: bash verify.sh
set -e
API_KEY="${LIVEKIT_API_KEY:-devkey}"
API_SECRET="${LIVEKIT_API_SECRET:-secret-poc-key-min-32-chars-required-here}"
HOST="${LIVEKIT_HOST:-localhost:7880}"
ROOM="${LIVEKIT_ROOM:-poc-room}"
ok() { printf "[OK] %s\n" "$1"; }
fail() { printf "[FAIL] %s\n" "$1"; exit 1; }
echo "==> 1/5 等待 HTTP 端点就绪 (http://${HOST}/)"
for i in $(seq 1 20); do
code=$(curl -s -o /dev/null -w "%{http_code}" "http://${HOST}/" || echo "000")
[ "$code" = "200" ] && { ok "HTTP 200"; break; }
[ $i -eq 20 ] && fail "20 秒内未就绪 (last code=${code})"
sleep 1
done
echo "==> 2/5 签发管理 + 客户端权限 Token"
TOKEN=$(API_KEY="$API_KEY" API_SECRET="$API_SECRET" ROOM="$ROOM" python3 - <<'PY'
import json, time, hmac, hashlib, base64, os
def b64u(b): return base64.urlsafe_b64encode(b).rstrip(b'=').decode()
header = b64u(json.dumps({"alg":"HS256","typ":"JWT"}, separators=(',',':')).encode())
payload = b64u(json.dumps({
"iss": os.environ["API_KEY"],
"sub": "poc-tester",
"name": "PoC Tester",
"video": {
"roomJoin": True, "room": os.environ["ROOM"],
"canPublish": True, "canSubscribe": True, "canPublishData": True,
"roomCreate": True, "roomList": True, "roomAdmin": True
},
"exp": int(time.time()) + 3600,
"nbf": int(time.time())
}, separators=(',',':')).encode())
sig = b64u(hmac.new(os.environ["API_SECRET"].encode(),
f"{header}.{payload}".encode(),
hashlib.sha256).digest())
print(f"{header}.{payload}.{sig}")
PY
)
[ -n "$TOKEN" ] || fail "Token 生成失败"
ok "Token 已生成 (${#TOKEN} chars)"
echo "==> 3/5 创建房间 ${ROOM} (CreateRoom RPC)"
create_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/CreateRoom" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"name\":\"${ROOM}\",\"empty_timeout\":300,\"max_participants\":10}")
echo " 响应: $create_resp"
echo "$create_resp" | jq -e '.sid' >/dev/null 2>&1 \
&& ok "房间已创建" \
|| fail "CreateRoom 失败"
echo "==> 4/5 列出房间 (ListRooms RPC)"
list_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/ListRooms" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d '{}')
room_count=$(echo "$list_resp" | jq '.rooms | length' 2>/dev/null || echo "0")
ok "当前房间数: ${room_count}"
echo "$list_resp" | jq '.'
echo "==> 5/5 删除房间 (DeleteRoom RPC) —— 清理"
del_resp=$(curl -s -X POST "http://${HOST}/twirp/livekit.RoomService/DeleteRoom" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"room\":\"${ROOM}\"}")
ok "删除响应: $del_resp"
# 重新签一个仅 client 权限的 token用于浏览器进会
CLIENT_TOKEN=$(API_KEY="$API_KEY" API_SECRET="$API_SECRET" ROOM="$ROOM" python3 - <<'PY'
import json, time, hmac, hashlib, base64, os
def b64u(b): return base64.urlsafe_b64encode(b).rstrip(b'=').decode()
header = b64u(json.dumps({"alg":"HS256","typ":"JWT"}, separators=(',',':')).encode())
payload = b64u(json.dumps({
"iss": os.environ["API_KEY"],
"sub": "browser-tester",
"name": "Browser",
"video": {
"roomJoin": True, "room": os.environ["ROOM"],
"canPublish": True, "canSubscribe": True, "canPublishData": True
},
"exp": int(time.time()) + 7200
}, separators=(',',':')).encode())
sig = b64u(hmac.new(os.environ["API_SECRET"].encode(),
f"{header}.{payload}".encode(),
hashlib.sha256).digest())
print(f"{header}.{payload}.{sig}")
PY
)
echo ""
echo "============================================================"
echo " LiveKit Server 验证通过"
echo "============================================================"
echo " 浏览器测试 (开两个窗口能互通)"
echo " https://meet.livekit.io/?liveKitUrl=ws%3A%2F%2F${HOST}&token=${CLIENT_TOKEN}"
echo ""
echo " 停止服务:"
echo " docker compose -f tools/livekit-poc/docker-compose.yml down"
echo "============================================================"

View File

@ -23,6 +23,11 @@
<artifactId>yudao-module-system</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-infra</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
@ -40,11 +45,6 @@
<artifactId>yudao-spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-websocket</artifactId>
</dependency>
<!-- DB 相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>

View File

@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.im.service.websocket;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
import cn.iocoder.yudao.module.im.service.websocket.dto.ImChannelMessageDTO;
import cn.iocoder.yudao.module.im.service.websocket.dto.ImGroupMessageDTO;
import cn.iocoder.yudao.module.im.service.websocket.dto.ImPrivateMessageDTO;
import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@ -33,7 +33,7 @@ import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.
public class ImWebSocketServiceImpl implements ImWebSocketService {
@Resource
private WebSocketMessageSender webSocketMessageSender;
private WebSocketSenderApi webSocketSenderApi;
@Override
public void sendPrivateMessageAsync(Collection<Long> userIds, ImPrivateMessageDTO dto) {
@ -64,7 +64,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService {
public void doSendPrivateMessage(Collection<Long> userIds, ImPrivateMessageDTO dto) {
for (Long userId : getDistinctUserIds(userIds)) {
try {
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
ImPrivateMessageDTO.TYPE, dto);
} catch (Exception e) {
log.error("[doSendPrivateMessage][userId({}) dto({}) 发送失败]", userId, dto, e);
@ -79,7 +79,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService {
public void doSendGroupMessage(Collection<Long> userIds, ImGroupMessageDTO dto) {
for (Long userId : getDistinctUserIds(userIds)) {
try {
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
ImGroupMessageDTO.TYPE, dto);
} catch (Exception e) {
log.error("[doSendGroupMessage][userId({}) dto({}) 发送失败]", userId, dto, e);
@ -94,7 +94,7 @@ public class ImWebSocketServiceImpl implements ImWebSocketService {
public void doSendChannelMessage(Collection<Long> userIds, ImChannelMessageDTO dto) {
for (Long userId : getDistinctUserIds(userIds)) {
try {
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), userId,
ImChannelMessageDTO.TYPE, dto);
} catch (Exception e) {
log.error("[doSendChannelMessage][userId({}) dto({}) 发送失败]", userId, dto, e);
@ -104,12 +104,12 @@ public class ImWebSocketServiceImpl implements ImWebSocketService {
/**
* 异步广播频道 WebSocket 消息给当前所有在线管理端用户;
* 依赖 WebSocketMessageSender 按 UserType 广播能力,离线用户由客户端上线 pull 兜底
* 依赖 infra WebSocketSenderApi 按 UserType 广播能力,离线用户由客户端上线 pull 兜底
*/
@Async
public void doBroadcastChannelMessage(ImChannelMessageDTO dto) {
try {
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(),
webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(),
ImChannelMessageDTO.TYPE, dto);
} catch (Exception e) {
log.error("[doBroadcastChannelMessage][dto({}) 广播失败]", dto, e);

View File

@ -87,6 +87,7 @@ public class ImMessageUtils {
validateNotBlank(getString(map, "name"));
}
@SuppressWarnings("PatternVariableCanBeUsed")
private static void validateMergeContent(Map<String, Object> map) {
validateNotBlank(getString(map, "title"));
Object messages = map.get("messages");

View File

@ -4,9 +4,9 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.enums.UserTypeEnum;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender;
import cn.iocoder.yudao.module.im.service.websocket.dto.ImGroupMessageDTO;
import cn.iocoder.yudao.module.im.service.websocket.dto.ImPrivateMessageDTO;
import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
@ -34,7 +34,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
private ImWebSocketServiceImpl imWebSocketService;
@Mock
private WebSocketMessageSender webSocketMessageSender;
private WebSocketSenderApi webSocketSenderApi;
@AfterEach
public void tearDown() {
@ -59,7 +59,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.sendPrivateMessageAsync(2L, dto);
// 断言
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImPrivateMessageDTO.TYPE), eq(dto));
}
}
@ -79,7 +79,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.sendPrivateMessageAsync(2L, dto);
// 断言:事务未提交,未推送
verify(webSocketMessageSender, never()).sendObject(anyInt(), anyLong(), anyString(), any());
verify(webSocketSenderApi, never()).sendObject(anyInt(), anyLong(), anyString(), any());
// 模拟事务提交
List<TransactionSynchronization> syncs =
@ -88,7 +88,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
syncs.forEach(TransactionSynchronization::afterCommit);
// 断言:提交后推送
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImPrivateMessageDTO.TYPE), eq(dto));
} finally {
TransactionSynchronizationManager.clear();
@ -110,11 +110,11 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.sendGroupMessageAsync(ListUtil.of(1L, 2L, 3L), dto);
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(1L), eq(ImGroupMessageDTO.TYPE), eq(dto));
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImGroupMessageDTO.TYPE), eq(dto));
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(3L), eq(ImGroupMessageDTO.TYPE), eq(dto));
}
}
@ -129,13 +129,13 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
dto.setGroupId(10L);
// 给 1 号用户推送时抛异常,不能影响 2/3 号
doThrow(new RuntimeException("user offline"))
.when(webSocketMessageSender).sendObject(anyInt(), eq(1L), anyString(), any());
.when(webSocketSenderApi).sendObject(anyInt(), eq(1L), anyString(), any());
imWebSocketService.sendGroupMessageAsync(ListUtil.of(1L, 2L, 3L), dto);
// 2L 和 3L 也都被推送
verify(webSocketMessageSender).sendObject(anyInt(), eq(2L), anyString(), any());
verify(webSocketMessageSender).sendObject(anyInt(), eq(3L), anyString(), any());
verify(webSocketSenderApi).sendObject(anyInt(), eq(2L), anyString(), any());
verify(webSocketSenderApi).sendObject(anyInt(), eq(3L), anyString(), any());
}
}
@ -146,7 +146,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.doSendGroupMessage(Collections.emptyList(), dto);
imWebSocketService.doSendGroupMessage(null, dto);
verifyNoInteractions(webSocketMessageSender);
verifyNoInteractions(webSocketSenderApi);
}
@Test
@ -155,11 +155,11 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.doSendGroupMessage(Arrays.asList(1L, 2L, 1L, null), dto);
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(1L), eq(ImGroupMessageDTO.TYPE), eq(dto));
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(2L), eq(ImGroupMessageDTO.TYPE), eq(dto));
verifyNoMoreInteractions(webSocketMessageSender);
verifyNoMoreInteractions(webSocketSenderApi);
}
@Test
@ -171,12 +171,12 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
// 准备sender 抛异常
ImPrivateMessageDTO dto = new ImPrivateMessageDTO().setSenderId(1L).setReceiverId(2L);
doThrow(new RuntimeException("user offline"))
.when(webSocketMessageSender).sendObject(anyInt(), anyLong(), anyString(), any());
.when(webSocketSenderApi).sendObject(anyInt(), anyLong(), anyString(), any());
// 调用:异常应被吞掉,不向上抛
imWebSocketService.sendPrivateMessageAsync(2L, dto);
verify(webSocketMessageSender).sendObject(anyInt(), eq(2L), anyString(), any());
verify(webSocketSenderApi).sendObject(anyInt(), eq(2L), anyString(), any());
}
}
@ -191,7 +191,7 @@ public class ImWebSocketServiceImplTest extends BaseMockitoUnitTest {
imWebSocketService.sendGroupMessageAsync(42L, dto);
verify(webSocketMessageSender).sendObject(
verify(webSocketSenderApi).sendObject(
eq(UserTypeEnum.ADMIN.getValue()), eq(42L), eq(ImGroupMessageDTO.TYPE), eq(dto));
}
}