mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
热重启特性开发中
This commit is contained in:
@ -19,6 +19,9 @@ import (
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gpm"
|
||||
"net"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -26,10 +29,12 @@ const (
|
||||
gDEFAULT_SERVER = "default"
|
||||
gDEFAULT_DOMAIN = "default"
|
||||
gDEFAULT_METHOD = "ALL"
|
||||
gDEFAULT_COOKIE_PATH = "/" // 默认path
|
||||
gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年)
|
||||
gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒)
|
||||
gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称
|
||||
gDEFAULT_COOKIE_PATH = "/" // 默认path
|
||||
gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年)
|
||||
gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒)
|
||||
gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称
|
||||
gCHILD_ENVIRON_KEY = "GF_WEB_SERVER_CHILD" // 用以子父进程识别,环境变量名称
|
||||
gCHILD_ENVIRON_STRING = gCHILD_ENVIRON_KEY + "=1" // 用以子父进程识别,环境变量键值设置
|
||||
)
|
||||
|
||||
// ghttp.Server结构体
|
||||
@ -38,7 +43,8 @@ type Server struct {
|
||||
name string // 服务名称,方便识别
|
||||
config ServerConfig // 配置对象
|
||||
status int8 // 当前服务器状态(0:未启动,1:运行中)
|
||||
servers []*http.Server // 底层http.Server列表
|
||||
servers []*gracefulServer // 底层http.Server列表
|
||||
pmanager *gpm.Manager // 进程管理器,用于管理子进程服务
|
||||
methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充)
|
||||
servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况),同时作为请求ID
|
||||
closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象)
|
||||
@ -110,7 +116,8 @@ func GetServer(name...interface{}) (*Server) {
|
||||
}
|
||||
s := &Server {
|
||||
name : sname,
|
||||
servers : make([]*http.Server, 0),
|
||||
servers : make([]*gracefulServer, 0),
|
||||
pmanager : gpm.New(),
|
||||
methodsMap : make(map[string]bool),
|
||||
handlerMap : make(HandlerMap),
|
||||
statusHandlerMap : make(map[string]HandlerFunc),
|
||||
@ -161,7 +168,9 @@ func (s *Server) Run() error {
|
||||
s.startCloseQueueLoop()
|
||||
|
||||
// 开始执行底层Web Server创建,端口监听
|
||||
var wg sync.WaitGroup
|
||||
var fd = 3
|
||||
var wg sync.WaitGroup
|
||||
var server *gracefulServer
|
||||
if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 {
|
||||
// HTTPS
|
||||
if len(s.config.HTTPSAddr) == 0 {
|
||||
@ -175,12 +184,20 @@ func (s *Server) Run() error {
|
||||
for _, v := range array {
|
||||
wg.Add(1)
|
||||
go func(addr string) {
|
||||
server := s.newGracefulServer(addr)
|
||||
if s.isChildProcess() {
|
||||
server = s.newGracefulServer(addr, fd)
|
||||
fd++
|
||||
} else {
|
||||
server = s.newGracefulServer(addr)
|
||||
}
|
||||
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
|
||||
glog.Error(err)
|
||||
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
wg.Done()
|
||||
} else {
|
||||
//s.servers = append(s.servers, server)
|
||||
s.servers = append(s.servers, server)
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
@ -193,12 +210,20 @@ func (s *Server) Run() error {
|
||||
for _, v := range array {
|
||||
wg.Add(1)
|
||||
go func(addr string) {
|
||||
server := s.newGracefulServer(addr)
|
||||
if s.isChildProcess() {
|
||||
server = s.newGracefulServer(addr, fd)
|
||||
fd++
|
||||
} else {
|
||||
server = s.newGracefulServer(addr)
|
||||
}
|
||||
if err := server.ListenAndServe(); err != nil {
|
||||
glog.Error(err)
|
||||
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
wg.Done()
|
||||
} else {
|
||||
//s.servers = append(s.servers, server)
|
||||
s.servers = append(s.servers, server)
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
@ -210,6 +235,65 @@ func (s *Server) Run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 重启Web Server
|
||||
func (s *Server) Restart() {
|
||||
if pid, err := s.startChildProcess(); err != nil {
|
||||
glog.Printf("server restart failed: %v, continue serving\n", err)
|
||||
} else {
|
||||
glog.Printf("server restart successfully, new pid: %d\n", pid)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭Web Server
|
||||
func (s *Server) Shutdown() {
|
||||
for _, v := range s.servers {
|
||||
v.
|
||||
if f, e := v.listener.(*net.TCPListener).File(); e == nil {
|
||||
files = append(files, f)
|
||||
} else {
|
||||
return 0, fmt.Errorf("failed to get listener file: %v", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 判断是否为子进程执行
|
||||
func (s *Server) isChildProcess() bool {
|
||||
return os.Getenv(gCHILD_ENVIRON_KEY) != ""
|
||||
}
|
||||
|
||||
// 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符
|
||||
func (s *Server) startChildProcess() (int, error) {
|
||||
if s.isChildProcess() {
|
||||
return 0, errors.New("only main process can fork child process")
|
||||
}
|
||||
// 构造子进程的环境变量,并增加环境变量参数以标识该进程是graceful子进程
|
||||
env := make([]string, 0)
|
||||
for _, value := range os.Environ() {
|
||||
if value != gCHILD_ENVIRON_STRING {
|
||||
env = append(env, value)
|
||||
}
|
||||
}
|
||||
env = append(env, gCHILD_ENVIRON_STRING)
|
||||
// 获取所有http server的file
|
||||
files := []*os.File{ os.Stdin,os.Stdout,os.Stderr}
|
||||
for _, v := range s.servers {
|
||||
if f, e := v.listener.(*net.TCPListener).File(); e == nil {
|
||||
files = append(files, f)
|
||||
} else {
|
||||
return 0, fmt.Errorf("failed to get listener file: %v", e)
|
||||
}
|
||||
}
|
||||
p, err := os.StartProcess(os.Args[0], os.Args, &os.ProcAttr {
|
||||
Env : env,
|
||||
Files : files,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to forkexec: %v", err)
|
||||
}
|
||||
return p.Pid, nil
|
||||
}
|
||||
|
||||
// 生成一个底层的Web Server对象
|
||||
func (s *Server) newServer(addr string) *http.Server {
|
||||
return &http.Server {
|
||||
|
||||
@ -10,41 +10,32 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
"context"
|
||||
"net/http"
|
||||
"os/signal"
|
||||
"crypto/tls"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY = "GF_WEB_SERVER_GRACEFUL"
|
||||
gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING = gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY + "=1"
|
||||
gGF_WEB_SERVER_GRACEFUL_LISTENER_FD = 3
|
||||
)
|
||||
|
||||
// 优雅的Web Server对象封装
|
||||
type gracefulServer struct {
|
||||
fd uintptr
|
||||
addr string
|
||||
httpServer *http.Server
|
||||
listener net.Listener
|
||||
isGraceful bool
|
||||
signalChan chan os.Signal
|
||||
shutdownChan chan bool
|
||||
}
|
||||
|
||||
// 创建一个优雅的Http Server
|
||||
func (s *Server) newGracefulServer(addr string) *gracefulServer {
|
||||
isGraceful := false
|
||||
if os.Getenv(gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY) != "" {
|
||||
isGraceful = true
|
||||
}
|
||||
return &gracefulServer {
|
||||
func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer {
|
||||
gs := &gracefulServer {
|
||||
addr : addr,
|
||||
httpServer : s.newServer(addr),
|
||||
isGraceful : isGraceful,
|
||||
signalChan : make(chan os.Signal),
|
||||
shutdownChan : make(chan bool),
|
||||
}
|
||||
if len(fd) > 0 && fd[0] > 0 {
|
||||
gs.fd = uintptr(fd[0])
|
||||
}
|
||||
return gs
|
||||
}
|
||||
|
||||
// 执行HTTP监听
|
||||
@ -84,7 +75,6 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error {
|
||||
|
||||
// 开始执行Web Server服务处理
|
||||
func (s *gracefulServer) doServe() error {
|
||||
go s.handleSignals()
|
||||
err := s.httpServer.Serve(s.listener)
|
||||
<-s.shutdownChan
|
||||
return err
|
||||
@ -94,13 +84,8 @@ func (s *gracefulServer) doServe() error {
|
||||
func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
|
||||
var ln net.Listener
|
||||
var err error
|
||||
if s.isGraceful {
|
||||
//path := fmt.Sprintf("%s%sgf.web.server.fd.%d", gfile.TempDir(), gfile.Separator,gtime.Nanosecond())
|
||||
//f, err := gfile.Open(path)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
f := os.NewFile(gGF_WEB_SERVER_GRACEFUL_LISTENER_FD, "")
|
||||
if s.fd > 0 {
|
||||
f := os.NewFile(s.fd, "")
|
||||
ln, err = net.FileListener(f)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("net.FileListener error: %v", err)
|
||||
@ -116,75 +101,13 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
|
||||
return ln, nil
|
||||
}
|
||||
|
||||
// 处理终端信号指令监听
|
||||
func (s *gracefulServer) handleSignals() {
|
||||
var sig os.Signal
|
||||
|
||||
signal.Notify(
|
||||
s.signalChan,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGUSR2,
|
||||
)
|
||||
|
||||
for {
|
||||
sig = <-s.signalChan
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
glog.Println("received SIGTERM, graceful shutting down HTTP server")
|
||||
s.shutdown()
|
||||
|
||||
case syscall.SIGUSR2:
|
||||
glog.Println("received SIGUSR2, graceful restarting HTTP server")
|
||||
if pid, err := s.startNewProcess(); err != nil {
|
||||
glog.Printf("start new process failed: %v, continue serving\n", err)
|
||||
} else {
|
||||
glog.Printf("start new process successfully, the new pid is %d\n", pid)
|
||||
s.shutdown()
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 执行请求优雅关闭
|
||||
func (s *gracefulServer) shutdown() {
|
||||
if err := s.httpServer.Shutdown(context.Background()); err != nil {
|
||||
glog.Errorf("server shutdown error: %v\n", err)
|
||||
glog.Errorf("server %s shutdown error: %v\n", s.addr, err)
|
||||
} else {
|
||||
glog.Println("server shutdown success")
|
||||
glog.Printf("server %s shutdown successfully\n", s.addr)
|
||||
s.shutdownChan <- true
|
||||
}
|
||||
}
|
||||
|
||||
// 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符
|
||||
func (s *gracefulServer) startNewProcess() (uintptr, error) {
|
||||
listenerFd, err := s.getTCPListenerFd()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get socket file descriptor: %v", err)
|
||||
}
|
||||
// 构造子进程的环境变量,并增加环境变量参数以标识该进程是graceful子进程
|
||||
env := make([]string, 0)
|
||||
for _, value := range os.Environ() {
|
||||
if value != gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING {
|
||||
env = append(env, value)
|
||||
}
|
||||
}
|
||||
env = append(env, gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING)
|
||||
fork, err := syscall.ForkExec(os.Args[0], os.Args, &syscall.ProcAttr {
|
||||
Env : env,
|
||||
Files : []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd(), listenerFd},
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to forkexec: %v", err)
|
||||
}
|
||||
return uintptr(fork), nil
|
||||
}
|
||||
|
||||
// 获得对应net.TCPListener的文件描述符文件ID
|
||||
func (s *gracefulServer) getTCPListenerFd() (uintptr, error) {
|
||||
file, err := s.listener.(*net.TCPListener).File()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return file.Fd(), nil
|
||||
}
|
||||
|
||||
102
g/os/gpm/gpm.go
Normal file
102
g/os/gpm/gpm.go
Normal file
@ -0,0 +1,102 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// 进程管理.
|
||||
package gpm
|
||||
|
||||
import (
|
||||
"os"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
)
|
||||
|
||||
// 进程管理器
|
||||
type Manager struct {
|
||||
processes *gmap.IntInterfaceMap // 所管理的子进程map
|
||||
}
|
||||
|
||||
// 子进程
|
||||
type Process struct {
|
||||
pm *Manager // 所属进程管理器
|
||||
path string // 可执行文件绝对路径
|
||||
args []string // 执行参数
|
||||
attr *os.ProcAttr // 进程属性
|
||||
process *os.Process // 底层进程对象
|
||||
}
|
||||
|
||||
// 创建一个进程管理器
|
||||
func New () *Manager {
|
||||
return &Manager{
|
||||
processes : gmap.NewIntInterfaceMap(),
|
||||
}
|
||||
}
|
||||
|
||||
// 创建一个进程(不执行)
|
||||
func (m *Manager) NewProcess(path string, args []string, env []string) *Process {
|
||||
attr := &os.ProcAttr {
|
||||
Env : env,
|
||||
Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr },
|
||||
}
|
||||
return &Process{
|
||||
pm : m,
|
||||
path : path,
|
||||
args : args,
|
||||
attr : attr,
|
||||
}
|
||||
}
|
||||
|
||||
// 获取一个进程
|
||||
func (m *Manager) GetProcess(pid int) *Process {
|
||||
if v := m.processes.Get(pid); v != nil {
|
||||
return v.(*Process)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取所有的进程对象,构成列表返回
|
||||
func (m *Manager) Processes() []*Process {
|
||||
processes := make([]*Process, 0)
|
||||
m.processes.RLockFunc(func(m map[int]interface{}) {
|
||||
for _, v := range m {
|
||||
processes = append(processes, v.(*Process))
|
||||
}
|
||||
})
|
||||
return processes
|
||||
}
|
||||
|
||||
// 等待所有子进程结束
|
||||
func (m *Manager) WaitAll() {
|
||||
processes := m.Processes()
|
||||
if len(processes) > 0 {
|
||||
for _, p := range processes {
|
||||
p.Wait()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭所有的进程
|
||||
func (m *Manager) KillAll() error {
|
||||
for _, p := range m.Processes() {
|
||||
if err := p.Kill(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 向所有进程发送信号量
|
||||
func (m *Manager) SignalAll(sig os.Signal) error {
|
||||
for _, p := range m.Processes() {
|
||||
if err := p.Signal(sig); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 当前进程总数
|
||||
func (m *Manager) Size() int {
|
||||
return m.processes.Size()
|
||||
}
|
||||
90
g/os/gpm/gpm_proccess.go
Normal file
90
g/os/gpm/gpm_proccess.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package gpm
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// 运行进程
|
||||
func (p *Process) Run() (int, error) {
|
||||
if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil {
|
||||
p.process = process
|
||||
p.pm.processes.Set(process.Pid, p)
|
||||
return process.Pid, nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Process) SetArgs(args []string) {
|
||||
p.args = args
|
||||
}
|
||||
|
||||
func (p *Process) AddArgs(args []string) {
|
||||
for _, v := range args {
|
||||
p.args = append(p.args, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Process) SetEnv(env []string) {
|
||||
p.attr.Env = env
|
||||
}
|
||||
|
||||
func (p *Process) AddEnv(env []string) {
|
||||
for _, v := range env {
|
||||
p.attr.Env = append(p.attr.Env, v)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Process) SetAttr(attr *os.ProcAttr) {
|
||||
p.attr = attr
|
||||
}
|
||||
|
||||
func (p *Process) GetAttr() *os.ProcAttr {
|
||||
return p.attr
|
||||
}
|
||||
|
||||
// PID
|
||||
func (p *Process) Pid() int {
|
||||
if p.process != nil {
|
||||
return p.process.Pid
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// Release releases any resources associated with the Process p,
|
||||
// rendering it unusable in the future.
|
||||
// Release only needs to be called if Wait is not.
|
||||
func (p *Process) Release() error {
|
||||
return p.process.Release()
|
||||
}
|
||||
|
||||
// Kill causes the Process to exit immediately.
|
||||
func (p *Process) Kill() error {
|
||||
if err := p.process.Kill(); err == nil {
|
||||
p.pm.processes.Remove(p.Pid())
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Wait waits for the Process to exit, and then returns a
|
||||
// ProcessState describing its status and an error, if any.
|
||||
// Wait releases any resources associated with the Process.
|
||||
// On most operating systems, the Process must be a child
|
||||
// of the current process or an error will be returned.
|
||||
func (p *Process) Wait() (*os.ProcessState, error) {
|
||||
return p.process.Wait()
|
||||
}
|
||||
|
||||
// Signal sends a signal to the Process.
|
||||
// Sending Interrupt on Windows is not implemented.
|
||||
func (p *Process) Signal(sig os.Signal) error {
|
||||
return p.process.Signal(sig)
|
||||
}
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/tabalt/gracehttp"
|
||||
"os"
|
||||
)
|
||||
|
||||
|
||||
@ -20,5 +21,7 @@ func test() {
|
||||
}
|
||||
|
||||
func main() {
|
||||
test()
|
||||
}
|
||||
fmt.Println(os.NewFile(11111, ""))
|
||||
fmt.Println(os.NewFile(111111111, ""))
|
||||
fmt.Println(os.NewFile(33333333333333, ""))
|
||||
}
|
||||
|
||||
@ -2,16 +2,24 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gpm"
|
||||
"os"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
//var v interface{}
|
||||
m := map[string]int {
|
||||
"age" : 18,
|
||||
m := gpm.New()
|
||||
env := os.Environ()
|
||||
env = append(env, "child=1")
|
||||
p := m.NewProcess(os.Args[0], os.Args, env)
|
||||
if os.Getenv("child") != "" {
|
||||
time.Sleep(3*time.Second)
|
||||
glog.Error("error")
|
||||
} else {
|
||||
pid, err := p.Run()
|
||||
fmt.Println(pid)
|
||||
fmt.Println(err)
|
||||
fmt.Println(p.Wait())
|
||||
}
|
||||
//v = m
|
||||
p := &m
|
||||
(*p)["age"] = 16
|
||||
//fmt.Println(v)
|
||||
fmt.Println(m)
|
||||
}
|
||||
Reference in New Issue
Block a user