mirror of
https://gitee.com/johng/gf
synced 2026-06-26 17:35:40 +08:00
为`gcfg`添加配置文件变更自定义回调,实现了`WatcherAdapter`接口,以下是`AdapterFile`的用法
test.yaml
```
b: "b"
```
```
package main
import (
"fmt"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcfg"
"github.com/gogf/gf/v2/os/gctx"
)
func main() {
ctx := gctx.New()
file, _ := gcfg.NewAdapterFile("test.yaml")
file.Data(ctx)
file.AddWatcher("test", func() {
value := file.MustGet(ctx, "b")
fmt.Println(value.String())
})
server := g.Server()
server.Run()
}
```
使用`g`和默认配置文件
```
file := g.Cfg().GetAdapter().(*gcfg.AdapterFile)
file.AddWatcher("test", func() {
})
file := g.Cfg().GetAdapter().(*gcfg.AdapterFile)
file.RemoveWatcher("test")
```
注意:由于`gf`的`AdapterFile`使用的监听到文件变化删除缓存下一次重新初始化的懒加载方案,所有除了默认加载的`config.xxx`文件外,自定义的配置文件像`test.yaml`之类的都需要在`AddWatcher`前主动读取一次数据进行初始化监听(
`g.Cfg("test").Data(ctx)`)
---------
Co-authored-by: hailaz <739476267@qq.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Hunk Zhu <hunk@joy999.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
228 lines
6.2 KiB
Go
228 lines
6.2 KiB
Go
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
|
//
|
|
// This Source Code Form is subject to the terms of the MIT License.
|
|
// If a copy of the MIT was not distributed with this file,
|
|
// You can obtain one at https://github.com/gogf/gf.
|
|
|
|
// Package consul implements gcfg.Adapter using consul service.
|
|
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/api/watch"
|
|
|
|
"github.com/gogf/gf/v2/encoding/gjson"
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
"github.com/gogf/gf/v2/os/gcfg"
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
)
|
|
|
|
var (
|
|
// Compile-time checking for interface implementation.
|
|
_ gcfg.Adapter = (*Client)(nil)
|
|
_ gcfg.WatcherAdapter = (*Client)(nil)
|
|
)
|
|
|
|
// Config is the configuration object for consul client.
|
|
type Config struct {
|
|
// api.Config in consul package
|
|
ConsulConfig api.Config `v:"required"`
|
|
// As configuration file path key
|
|
Path string `v:"required"`
|
|
// Watch watches remote configuration updates, which updates local configuration in memory immediately when remote configuration changes.
|
|
Watch bool
|
|
// Logging interface, customized by user, default: glog.New()
|
|
Logger glog.ILogger
|
|
}
|
|
|
|
// Client implements gcfg.Adapter implementing using consul service.
|
|
type Client struct {
|
|
// Created config object
|
|
config Config
|
|
// Consul config client
|
|
client *api.Client
|
|
// Configmap content cached. It is `*gjson.Json` value internally.
|
|
value *g.Var
|
|
// Watchers for watching file changes.
|
|
watchers *gcfg.WatcherRegistry
|
|
}
|
|
|
|
// New creates and returns gcfg.Adapter implementing using consul service.
|
|
func New(ctx context.Context, config Config) (adapter gcfg.Adapter, err error) {
|
|
err = g.Validator().Data(config).Run(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if config.Logger == nil {
|
|
config.Logger = glog.New()
|
|
}
|
|
|
|
client := &Client{
|
|
config: config,
|
|
value: g.NewVar(nil, true),
|
|
watchers: gcfg.NewWatcherRegistry(),
|
|
}
|
|
|
|
client.client, err = api.NewClient(&config.ConsulConfig)
|
|
if err != nil {
|
|
return nil, gerror.Wrapf(err, `create consul client failed with config: %+v`, config.ConsulConfig)
|
|
}
|
|
|
|
if err = client.addWatcher(); err != nil {
|
|
return nil, gerror.Wrapf(err, `consul client add watcher failed with config: %+v`, config.ConsulConfig)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// Available checks and returns the backend configuration service is available.
|
|
// The optional parameter `resource` specifies certain configuration resource.
|
|
//
|
|
// Note that this function does not return error as it just does simply check for
|
|
// backend configuration service.
|
|
func (c *Client) Available(ctx context.Context, resource ...string) (ok bool) {
|
|
if len(resource) == 0 && !c.value.IsNil() {
|
|
return true
|
|
}
|
|
|
|
_, _, err := c.client.KV().Get(c.config.Path, nil)
|
|
|
|
return err == nil
|
|
}
|
|
|
|
// Get retrieves and returns value by specified `pattern` in current resource.
|
|
// Pattern like:
|
|
// "x.y.z" for map item.
|
|
// "x.0.y" for slice item.
|
|
func (c *Client) Get(ctx context.Context, pattern string) (value any, err error) {
|
|
if c.value.IsNil() {
|
|
if err = c.updateLocalValue(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return c.value.Val().(*gjson.Json).Get(pattern).Val(), nil
|
|
}
|
|
|
|
// Data retrieves and returns all configuration data in current resource as map.
|
|
// Note that this function may lead lots of memory usage if configuration data is too large,
|
|
// you can implement this function if necessary.
|
|
func (c *Client) Data(ctx context.Context) (data map[string]any, err error) {
|
|
if c.value.IsNil() {
|
|
if err = c.updateLocalValue(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return c.value.Val().(*gjson.Json).Map(), nil
|
|
}
|
|
|
|
func (c *Client) updateLocalValue() (err error) {
|
|
content, _, err := c.client.KV().Get(c.config.Path, nil)
|
|
if err != nil {
|
|
return gerror.Wrapf(err, `get config from consul path [%+v] failed`, c.config.Path)
|
|
}
|
|
if content == nil {
|
|
return fmt.Errorf(`get config from consul path [%+v] value is nil`, c.config.Path)
|
|
}
|
|
return c.doUpdate(content.Value)
|
|
}
|
|
|
|
func (c *Client) doUpdate(content []byte) (err error) {
|
|
var j *gjson.Json
|
|
if j, err = gjson.LoadContent(content); err != nil {
|
|
return gerror.Wrapf(err,
|
|
`parse config map item from consul path [%+v] failed`, c.config.Path)
|
|
}
|
|
c.value.Set(j)
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) addWatcher() (err error) {
|
|
if !c.config.Watch {
|
|
return nil
|
|
}
|
|
|
|
plan, err := watch.Parse(map[string]any{
|
|
"type": "key",
|
|
"key": c.config.Path,
|
|
})
|
|
if err != nil {
|
|
return gerror.Wrapf(err, `watch config from consul path %+v failed`, c.config.Path)
|
|
}
|
|
|
|
plan.Handler = func(idx uint64, raw any) {
|
|
var v *api.KVPair
|
|
if raw == nil {
|
|
// nil is a valid return value
|
|
v = nil
|
|
return
|
|
}
|
|
var ok bool
|
|
if v, ok = raw.(*api.KVPair); !ok {
|
|
return
|
|
}
|
|
err = c.doUpdate(v.Value)
|
|
if err != nil {
|
|
c.config.Logger.Errorf(
|
|
context.Background(),
|
|
"watch config from consul path %+v update failed: %s",
|
|
c.config.Path, err,
|
|
)
|
|
} else {
|
|
var m *gjson.Json
|
|
m, err = gjson.LoadContent(v.Value, true)
|
|
if err != nil {
|
|
c.config.Logger.Errorf(
|
|
context.Background(),
|
|
"watch config from consul path %+v parse failed: %s",
|
|
c.config.Path, err,
|
|
)
|
|
} else {
|
|
adapterCtx := NewAdapterCtx().WithOperation(gcfg.OperationUpdate).WithPath(c.config.Path).WithContent(m)
|
|
c.notifyWatchers(adapterCtx.Ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
plan.Datacenter = c.config.ConsulConfig.Datacenter
|
|
plan.Token = c.config.ConsulConfig.Token
|
|
|
|
go c.startAsynchronousWatch(plan)
|
|
return nil
|
|
}
|
|
|
|
// startAsynchronousWatch starts the asynchronous watch.
|
|
func (c *Client) startAsynchronousWatch(plan *watch.Plan) {
|
|
if err := plan.Run(c.config.ConsulConfig.Address); err != nil {
|
|
c.config.Logger.Errorf(
|
|
context.Background(),
|
|
"watch config from consul path %+v plan start failed: %s",
|
|
c.config.Path, err,
|
|
)
|
|
}
|
|
}
|
|
|
|
// AddWatcher adds a watcher for the specified configuration file.
|
|
func (c *Client) AddWatcher(name string, f func(ctx context.Context)) {
|
|
c.watchers.Add(name, f)
|
|
}
|
|
|
|
// RemoveWatcher removes the watcher for the specified configuration file.
|
|
func (c *Client) RemoveWatcher(name string) {
|
|
c.watchers.Remove(name)
|
|
}
|
|
|
|
// GetWatcherNames returns all watcher names.
|
|
func (c *Client) GetWatcherNames() []string {
|
|
return c.watchers.GetNames()
|
|
}
|
|
|
|
// notifyWatchers notifies all watchers.
|
|
func (c *Client) notifyWatchers(ctx context.Context) {
|
|
c.watchers.Notify(ctx)
|
|
}
|