Files
gf/contrib/config/consul/consul.go
Lance Add ac3efe5a00 feat(os/gcfg): Add file watcher with custom callback support (#4446)
为`gcfg`添加配置文件变更自定义回调,实现了`WatcherAdapter`接口,以下是`AdapterFile`的用法
test.yaml
```
b: "b"

```
```
package main

import (
	"fmt"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/os/gcfg"
	"github.com/gogf/gf/v2/os/gctx"
)

func main() {
	ctx := gctx.New()
	file, _ := gcfg.NewAdapterFile("test.yaml")
	file.Data(ctx)
	file.AddWatcher("test", func() {
		value := file.MustGet(ctx, "b")
		fmt.Println(value.String())
	})
	server := g.Server()
	server.Run()
}
```
使用`g`和默认配置文件
```
	file := g.Cfg().GetAdapter().(*gcfg.AdapterFile)
	file.AddWatcher("test", func() {

	})
	file := g.Cfg().GetAdapter().(*gcfg.AdapterFile)
	file.RemoveWatcher("test")
```

注意:由于`gf`的`AdapterFile`使用的监听到文件变化删除缓存下一次重新初始化的懒加载方案,所有除了默认加载的`config.xxx`文件外,自定义的配置文件像`test.yaml`之类的都需要在`AddWatcher`前主动读取一次数据进行初始化监听(
`g.Cfg("test").Data(ctx)`)

---------

Co-authored-by: hailaz <739476267@qq.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Hunk Zhu <hunk@joy999.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-10-15 16:59:52 +08:00

228 lines
6.2 KiB
Go

// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Package consul implements gcfg.Adapter using consul service.
package consul
import (
"context"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcfg"
"github.com/gogf/gf/v2/os/glog"
)
var (
// Compile-time checking for interface implementation.
_ gcfg.Adapter = (*Client)(nil)
_ gcfg.WatcherAdapter = (*Client)(nil)
)
// Config is the configuration object for consul client.
type Config struct {
// api.Config in consul package
ConsulConfig api.Config `v:"required"`
// As configuration file path key
Path string `v:"required"`
// Watch watches remote configuration updates, which updates local configuration in memory immediately when remote configuration changes.
Watch bool
// Logging interface, customized by user, default: glog.New()
Logger glog.ILogger
}
// Client implements gcfg.Adapter implementing using consul service.
type Client struct {
// Created config object
config Config
// Consul config client
client *api.Client
// Configmap content cached. It is `*gjson.Json` value internally.
value *g.Var
// Watchers for watching file changes.
watchers *gcfg.WatcherRegistry
}
// New creates and returns gcfg.Adapter implementing using consul service.
func New(ctx context.Context, config Config) (adapter gcfg.Adapter, err error) {
err = g.Validator().Data(config).Run(ctx)
if err != nil {
return nil, err
}
if config.Logger == nil {
config.Logger = glog.New()
}
client := &Client{
config: config,
value: g.NewVar(nil, true),
watchers: gcfg.NewWatcherRegistry(),
}
client.client, err = api.NewClient(&config.ConsulConfig)
if err != nil {
return nil, gerror.Wrapf(err, `create consul client failed with config: %+v`, config.ConsulConfig)
}
if err = client.addWatcher(); err != nil {
return nil, gerror.Wrapf(err, `consul client add watcher failed with config: %+v`, config.ConsulConfig)
}
return client, nil
}
// Available checks and returns the backend configuration service is available.
// The optional parameter `resource` specifies certain configuration resource.
//
// Note that this function does not return error as it just does simply check for
// backend configuration service.
func (c *Client) Available(ctx context.Context, resource ...string) (ok bool) {
if len(resource) == 0 && !c.value.IsNil() {
return true
}
_, _, err := c.client.KV().Get(c.config.Path, nil)
return err == nil
}
// Get retrieves and returns value by specified `pattern` in current resource.
// Pattern like:
// "x.y.z" for map item.
// "x.0.y" for slice item.
func (c *Client) Get(ctx context.Context, pattern string) (value any, err error) {
if c.value.IsNil() {
if err = c.updateLocalValue(); err != nil {
return nil, err
}
}
return c.value.Val().(*gjson.Json).Get(pattern).Val(), nil
}
// Data retrieves and returns all configuration data in current resource as map.
// Note that this function may lead lots of memory usage if configuration data is too large,
// you can implement this function if necessary.
func (c *Client) Data(ctx context.Context) (data map[string]any, err error) {
if c.value.IsNil() {
if err = c.updateLocalValue(); err != nil {
return nil, err
}
}
return c.value.Val().(*gjson.Json).Map(), nil
}
func (c *Client) updateLocalValue() (err error) {
content, _, err := c.client.KV().Get(c.config.Path, nil)
if err != nil {
return gerror.Wrapf(err, `get config from consul path [%+v] failed`, c.config.Path)
}
if content == nil {
return fmt.Errorf(`get config from consul path [%+v] value is nil`, c.config.Path)
}
return c.doUpdate(content.Value)
}
func (c *Client) doUpdate(content []byte) (err error) {
var j *gjson.Json
if j, err = gjson.LoadContent(content); err != nil {
return gerror.Wrapf(err,
`parse config map item from consul path [%+v] failed`, c.config.Path)
}
c.value.Set(j)
return nil
}
func (c *Client) addWatcher() (err error) {
if !c.config.Watch {
return nil
}
plan, err := watch.Parse(map[string]any{
"type": "key",
"key": c.config.Path,
})
if err != nil {
return gerror.Wrapf(err, `watch config from consul path %+v failed`, c.config.Path)
}
plan.Handler = func(idx uint64, raw any) {
var v *api.KVPair
if raw == nil {
// nil is a valid return value
v = nil
return
}
var ok bool
if v, ok = raw.(*api.KVPair); !ok {
return
}
err = c.doUpdate(v.Value)
if err != nil {
c.config.Logger.Errorf(
context.Background(),
"watch config from consul path %+v update failed: %s",
c.config.Path, err,
)
} else {
var m *gjson.Json
m, err = gjson.LoadContent(v.Value, true)
if err != nil {
c.config.Logger.Errorf(
context.Background(),
"watch config from consul path %+v parse failed: %s",
c.config.Path, err,
)
} else {
adapterCtx := NewAdapterCtx().WithOperation(gcfg.OperationUpdate).WithPath(c.config.Path).WithContent(m)
c.notifyWatchers(adapterCtx.Ctx)
}
}
}
plan.Datacenter = c.config.ConsulConfig.Datacenter
plan.Token = c.config.ConsulConfig.Token
go c.startAsynchronousWatch(plan)
return nil
}
// startAsynchronousWatch starts the asynchronous watch.
func (c *Client) startAsynchronousWatch(plan *watch.Plan) {
if err := plan.Run(c.config.ConsulConfig.Address); err != nil {
c.config.Logger.Errorf(
context.Background(),
"watch config from consul path %+v plan start failed: %s",
c.config.Path, err,
)
}
}
// AddWatcher adds a watcher for the specified configuration file.
func (c *Client) AddWatcher(name string, f func(ctx context.Context)) {
c.watchers.Add(name, f)
}
// RemoveWatcher removes the watcher for the specified configuration file.
func (c *Client) RemoveWatcher(name string) {
c.watchers.Remove(name)
}
// GetWatcherNames returns all watcher names.
func (c *Client) GetWatcherNames() []string {
return c.watchers.GetNames()
}
// notifyWatchers notifies all watchers.
func (c *Client) notifyWatchers(ctx context.Context) {
c.watchers.Notify(ctx)
}