mirror of
https://gitee.com/johng/gf
synced 2026-06-06 02:25:47 +08:00
### 变更说明
本次重构将项目中用于**实例管理的容器**从 `StrAnyMap`/`IntAnyMap` 迁移到类型安全的泛型实现
`KVMapWithChecker`,同时将相关的 `glist.List` 和 `gqueue.Queue`
替换为对应的泛型版本,以提高实例管理的类型安全性。并且减少原先代码中的大量类型断言,提高性能。
### 前因
目前`goframe`中大量使用了包含`any`的容器,然后通过断言去转换类型,麻烦且影响性能,尤其是对`gdb/gredis/glog`等需要高频获取`instance`实例的组件影响较大。最近几个版本中gf完成了数据结构容器的泛型化改造,以及我最近解决了其中几个泛型容器对于`typed
nil`过滤的问题,所以可以逐步迁移这些实例容器到泛型容器,减少断言优化性能
### 主要改进
#### 1. 实例容器泛型化
以下模块的实例管理容器已迁移到泛型实现:
**核心实例管理**:
- `database/gdb`: 数据库实例容器 → `KVMap[string, DB]`
- `database/gredis`: Redis 实例容器 → `KVMap[string, *Redis]`
- `database/gredis`: Redis 配置容器 → `KVMap[string, *Config]`
- `os/gcfg`: 配置实例容器 → `KVMap[string, *Config]`
- `os/glog`: 日志实例容器 → `KVMap[string, *Logger]`
- `os/gview`: 视图实例容器 → `KVMap[string, *View]`
- `i18n/gi18n`: 国际化实例容器 → `KVMap[string, *Manager]`
**网络服务实例**:
- `net/ghttp`: HTTP 服务器容器 → `KVMap[string, *Server]`
- `net/gtcp`: TCP 服务器容器 → `KVMap[any, *Server]`
- `net/gudp`: UDP 服务器容器 → `KVMap[string, *Server]`
**其他实例容器**:
- `os/gres`: 资源实例容器 → `KVMap[string, *Resource]`
- `os/gfpool`: 文件池容器 → `KVMap[string, *Pool]`
- `os/gspath`: 路径搜索容器 → `KVMap[string, *SPath]`
- `net/gtcp`: 连接池容器 → `KVMap[string, *gpool.Pool]`
#### 2. 相关数据结构泛型化
- `os/gfsnotify`: 回调列表 → `TList[*Callback]`,事件队列 → `TQueue[*Event]`
- `os/grpool`: 任务队列 → `TList[*localPoolItem]`
- `os/gcache`: 事件队列 → `TList[*adapterMemoryEvent]`
- `net/ghttp`: 解析项列表 → `TList[*HandlerItemParsed]`
- `os/gproc`: 消息队列 → `TQueue[*MsgRequest]`
- `os/gmlock`: 锁映射 → `KVMap[string, *sync.RWMutex]`
### 技术实现
1. **引入检查器函数**: 为每个实例容器添加 `checker` 函数用于空值检测
2. **消除类型断言**: 实例获取时无需 `v.(*Type)` 转换
3. **明确函数签名**: `GetOrSetFuncLock` 的回调从 `func() any` 改为 `func() T`
### 使用示例
#### 实例容器的变更
**变更前**:
```go
// 旧的实例管理方式
var instances = gmap.NewStrAnyMap(true)
func Instance(name string) *Logger {
v := instances.GetOrSetFuncLock(name, func() any {
return New()
})
return v.(*Logger) // 需要类型断言
}
```
**变更后**:
```go
// 新的泛型实例容器
var (
checker = func(v *Logger) bool { return v == nil }
instances = gmap.NewKVMapWithChecker[string, *Logger](checker, true)
)
func Instance(name string) *Logger {
return instances.GetOrSetFuncLock(name, New) // 直接返回,无需断言
}
```
#### 队列容器的变更
**变更前**:
```go
// 旧的队列方式
events := gqueue.New()
events.Push(&Event{Path: "/tmp/file"})
if v := events.Pop(); v != nil {
event := v.(*Event) // 需要类型断言
handleEvent(event)
}
```
**变更后**:
```go
// 新的泛型队列
events := gqueue.NewTQueue[*Event]()
events.Push(&Event{Path: "/tmp/file"})
if event := events.Pop(); event != nil {
handleEvent(event) // event 已是 *Event 类型
}
```
### 收益
- ✅ **编译时类型安全**: 实例容器的类型错误在编译期捕获
- ✅ **消除运行时断言**: 避免类型断言带来的 panic 风险
- ✅ **提升代码可读性**: 实例管理逻辑更清晰
- ✅ **改善开发体验**: IDE 类型提示和代码补全更准确
### 性能权衡
**编译时**:
- 泛型实例化会增加编译时间和二进制体积
- 预估编译时间增加 5-15%,二进制体积增加约 1-2MB
**运行时**:
- 减少类型断言的反射开销
- 提升实例获取等热点路径的性能
305 lines
8.4 KiB
Go
305 lines
8.4 KiB
Go
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
|
//
|
|
// This Source Code Form is subject to the terms of the MIT License.
|
|
// If a copy of the MIT was not distributed with this file,
|
|
// You can obtain one at https://github.com/gogf/gf.
|
|
|
|
package ghttp
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gogf/gf/v2/container/gtype"
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|
"github.com/gogf/gf/v2/errors/gcode"
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
"github.com/gogf/gf/v2/internal/intlog"
|
|
"github.com/gogf/gf/v2/os/gfile"
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
"github.com/gogf/gf/v2/os/gproc"
|
|
"github.com/gogf/gf/v2/os/gtime"
|
|
"github.com/gogf/gf/v2/os/gtimer"
|
|
"github.com/gogf/gf/v2/text/gstr"
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
)
|
|
|
|
const (
|
|
// Allow executing management command after server starts after this interval in milliseconds.
|
|
adminActionIntervalLimit = 2000
|
|
adminActionNone = 0
|
|
adminActionRestarting = 1
|
|
adminActionShuttingDown = 2
|
|
adminActionReloadEnvKey = "GF_SERVER_RELOAD"
|
|
adminActionRestartEnvKey = "GF_SERVER_RESTART"
|
|
adminGProcCommGroup = "GF_GPROC_HTTP_SERVER"
|
|
)
|
|
|
|
var (
|
|
// serverActionLocker is the locker for server administration operations.
|
|
serverActionLocker sync.Mutex
|
|
|
|
// serverActionLastTime is timestamp in milliseconds of last administration operation.
|
|
serverActionLastTime = gtype.NewInt64(gtime.TimestampMilli())
|
|
|
|
// serverProcessStatus is the server status for operation of current process.
|
|
serverProcessStatus = gtype.NewInt()
|
|
)
|
|
|
|
// RestartAllServer restarts all the servers of the process gracefully.
|
|
// The optional parameter `newExeFilePath` specifies the new binary file for creating process.
|
|
func RestartAllServer(ctx context.Context, newExeFilePath string) error {
|
|
if !gracefulEnabled {
|
|
return gerror.NewCode(
|
|
gcode.CodeInvalidOperation,
|
|
"graceful reload feature is disabled",
|
|
)
|
|
}
|
|
serverActionLocker.Lock()
|
|
defer serverActionLocker.Unlock()
|
|
if err := checkProcessStatus(); err != nil {
|
|
return err
|
|
}
|
|
if err := checkActionFrequency(); err != nil {
|
|
return err
|
|
}
|
|
return restartWebServers(ctx, nil, newExeFilePath)
|
|
}
|
|
|
|
// ShutdownAllServer shuts down all servers of current process gracefully.
|
|
func ShutdownAllServer(ctx context.Context) error {
|
|
serverActionLocker.Lock()
|
|
defer serverActionLocker.Unlock()
|
|
if err := checkProcessStatus(); err != nil {
|
|
return err
|
|
}
|
|
if err := checkActionFrequency(); err != nil {
|
|
return err
|
|
}
|
|
shutdownWebServersGracefully(ctx, nil)
|
|
return nil
|
|
}
|
|
|
|
// checkProcessStatus checks the server status of current process.
|
|
func checkProcessStatus() error {
|
|
status := serverProcessStatus.Val()
|
|
if status > 0 {
|
|
switch status {
|
|
case adminActionRestarting:
|
|
return gerror.NewCode(gcode.CodeInvalidOperation, "server is restarting")
|
|
|
|
case adminActionShuttingDown:
|
|
return gerror.NewCode(gcode.CodeInvalidOperation, "server is shutting down")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkActionFrequency checks the operation frequency.
|
|
// It returns error if it is too frequency.
|
|
func checkActionFrequency() error {
|
|
interval := gtime.TimestampMilli() - serverActionLastTime.Val()
|
|
if interval < adminActionIntervalLimit {
|
|
return gerror.NewCodef(
|
|
gcode.CodeInvalidOperation,
|
|
"too frequent action, please retry in %d ms",
|
|
adminActionIntervalLimit-interval,
|
|
)
|
|
}
|
|
serverActionLastTime.Set(gtime.TimestampMilli())
|
|
return nil
|
|
}
|
|
|
|
// forkReloadProcess creates a new child process and copies the fd to child process.
|
|
func forkReloadProcess(ctx context.Context, newExeFilePath ...string) error {
|
|
var (
|
|
binaryPath = os.Args[0]
|
|
)
|
|
if len(newExeFilePath) > 0 && newExeFilePath[0] != "" {
|
|
binaryPath = newExeFilePath[0]
|
|
}
|
|
if !gfile.Exists(binaryPath) {
|
|
return gerror.Newf(`binary file path "%s" does not exist`, binaryPath)
|
|
}
|
|
var (
|
|
p = gproc.NewProcess(binaryPath, os.Args[1:], os.Environ())
|
|
sfm = getServerFdMap()
|
|
)
|
|
for name, m := range sfm {
|
|
for fdk, fdv := range m {
|
|
if len(fdv) > 0 {
|
|
s := ""
|
|
for _, item := range gstr.SplitAndTrim(fdv, ",") {
|
|
array := strings.Split(item, "#")
|
|
fd := uintptr(gconv.Uint(array[1]))
|
|
if fd > 0 {
|
|
s += fmt.Sprintf("%s#%d,", array[0], 3+len(p.ExtraFiles))
|
|
p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, ""))
|
|
} else {
|
|
s += fmt.Sprintf("%s#%d,", array[0], 0)
|
|
}
|
|
}
|
|
sfm[name][fdk] = strings.TrimRight(s, ",")
|
|
}
|
|
}
|
|
}
|
|
buffer, _ := gjson.Encode(sfm)
|
|
p.Env = append(p.Env, adminActionReloadEnvKey+"="+string(buffer))
|
|
if _, err := p.Start(ctx); err != nil {
|
|
intlog.Errorf(
|
|
ctx,
|
|
"%d: fork process failed, error: %s, %s",
|
|
gproc.Pid(), err.Error(), string(buffer),
|
|
)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// forkRestartProcess creates a new server process.
|
|
func forkRestartProcess(ctx context.Context, newExeFilePath ...string) error {
|
|
var (
|
|
path = os.Args[0]
|
|
)
|
|
if len(newExeFilePath) > 0 && newExeFilePath[0] != "" {
|
|
path = newExeFilePath[0]
|
|
}
|
|
if err := os.Unsetenv(adminActionReloadEnvKey); err != nil {
|
|
intlog.Errorf(ctx, `%+v`, err)
|
|
}
|
|
env := os.Environ()
|
|
env = append(env, adminActionRestartEnvKey+"=1")
|
|
p := gproc.NewProcess(path, os.Args[1:], env)
|
|
if _, err := p.Start(ctx); err != nil {
|
|
glog.Errorf(
|
|
ctx,
|
|
`%d: fork process failed, error:%s, are you running using "go run"?`,
|
|
gproc.Pid(), err.Error(),
|
|
)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// getServerFdMap returns all the servers name to file descriptor mapping as map.
|
|
func getServerFdMap() map[string]listenerFdMap {
|
|
sfm := make(map[string]listenerFdMap)
|
|
serverMapping.RLockFunc(func(m map[string]*Server) {
|
|
for k, v := range m {
|
|
sfm[k] = v.getListenerFdMap()
|
|
}
|
|
})
|
|
return sfm
|
|
}
|
|
|
|
// bufferToServerFdMap converts binary content to fd map.
|
|
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
|
|
sfm := make(map[string]listenerFdMap)
|
|
if len(buffer) > 0 {
|
|
j, _ := gjson.LoadContent(buffer)
|
|
for k := range j.Var().Map() {
|
|
m := make(map[string]string)
|
|
for mapKey, mapValue := range j.Get(k).MapStrStr() {
|
|
m[mapKey] = mapValue
|
|
}
|
|
sfm[k] = m
|
|
}
|
|
}
|
|
return sfm
|
|
}
|
|
|
|
// restartWebServers restarts all servers.
|
|
func restartWebServers(ctx context.Context, signal os.Signal, newExeFilePath string) error {
|
|
serverProcessStatus.Set(adminActionRestarting)
|
|
if runtime.GOOS == "windows" {
|
|
if signal != nil {
|
|
// Controlled by signal.
|
|
forceCloseWebServers(ctx)
|
|
if err := forkRestartProcess(ctx, newExeFilePath); err != nil {
|
|
intlog.Errorf(ctx, `%+v`, err)
|
|
}
|
|
return nil
|
|
}
|
|
// Controlled by web page.
|
|
// It should ensure the response wrote to client and then close all servers gracefully.
|
|
gtimer.SetTimeout(ctx, time.Second, func(ctx context.Context) {
|
|
forceCloseWebServers(ctx)
|
|
if err := forkRestartProcess(ctx, newExeFilePath); err != nil {
|
|
intlog.Errorf(ctx, `%+v`, err)
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
if err := forkReloadProcess(ctx, newExeFilePath); err != nil {
|
|
glog.Printf(ctx, "%d: server restarts failed", gproc.Pid())
|
|
serverProcessStatus.Set(adminActionNone)
|
|
return err
|
|
} else {
|
|
if signal != nil {
|
|
glog.Printf(ctx, "%d: server restarting by signal: %s", gproc.Pid(), signal)
|
|
} else {
|
|
glog.Printf(ctx, "%d: server restarting by web admin", gproc.Pid())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// shutdownWebServersGracefully gracefully shuts down all servers.
|
|
func shutdownWebServersGracefully(ctx context.Context, signal os.Signal) {
|
|
serverProcessStatus.Set(adminActionShuttingDown)
|
|
if signal != nil {
|
|
glog.Printf(
|
|
ctx,
|
|
"%d: server gracefully shutting down by signal: %s",
|
|
gproc.Pid(), signal.String(),
|
|
)
|
|
} else {
|
|
glog.Printf(ctx, "pid[%d]: server gracefully shutting down by api", gproc.Pid())
|
|
}
|
|
serverMapping.RLockFunc(func(m map[string]*Server) {
|
|
for _, v := range m {
|
|
v.doServiceDeregister()
|
|
for _, s := range v.servers {
|
|
s.Shutdown(ctx)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// forceCloseWebServers forced shuts down all servers.
|
|
func forceCloseWebServers(ctx context.Context) {
|
|
serverMapping.RLockFunc(func(m map[string]*Server) {
|
|
for _, v := range m {
|
|
for _, s := range v.servers {
|
|
s.Close(ctx)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// handleProcessMessage receives and handles the message from processes,
|
|
// which are commonly used for graceful reloading feature.
|
|
func handleProcessMessage() {
|
|
var (
|
|
ctx = context.TODO()
|
|
)
|
|
for {
|
|
if msg := gproc.Receive(adminGProcCommGroup); msg != nil {
|
|
if bytes.EqualFold(msg.Data, []byte("exit")) {
|
|
intlog.Printf(ctx, "%d: process message: exit", gproc.Pid())
|
|
shutdownWebServersGracefully(ctx, nil)
|
|
allShutdownChan <- struct{}{}
|
|
intlog.Printf(ctx, "%d: process message: exit done", gproc.Pid())
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|