mirror of
https://gitee.com/johng/gf
synced 2026-06-06 02:25:47 +08:00
improve clickhouse driver
This commit is contained in:
@ -14,7 +14,7 @@ import _ "github.com/gogf/gf/contrib/drivers/pgsql/v2"
|
||||
|
||||
# Supported Drivers
|
||||
|
||||
## MySQL
|
||||
## MySQL/MariaDB/TiDB
|
||||
|
||||
BuiltIn supported, nothing todo.
|
||||
|
||||
@ -50,6 +50,17 @@ Note:
|
||||
- It does not support `Save/Replace` features.
|
||||
- It does not support `LastInsertId`.
|
||||
|
||||
## ClickHouse
|
||||
```
|
||||
import _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
|
||||
```
|
||||
Note:
|
||||
- It does not support `InsertIgnore/InsertGetId` features.
|
||||
- It does not support `Save/Replace` features.
|
||||
- It does not support `Transaction` feature.
|
||||
- It does not support `Transaction` feature.
|
||||
|
||||
|
||||
# Custom Drivers
|
||||
|
||||
It's quick and easy, please refer to current driver source.
|
||||
|
||||
@ -12,9 +12,10 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/ClickHouse/clickhouse-go"
|
||||
"strings"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go"
|
||||
|
||||
"github.com/gogf/gf/v2/container/gmap"
|
||||
"github.com/gogf/gf/v2/database/gdb"
|
||||
"github.com/gogf/gf/v2/errors/gcode"
|
||||
@ -32,11 +33,11 @@ type Driver struct {
|
||||
var (
|
||||
// tableFieldsMap caches the table information retrieved from database.
|
||||
tableFieldsMap = gmap.New(true)
|
||||
errUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore")
|
||||
errUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId")
|
||||
errUnsupportedReplace = errors.New("unsupported method:Replace")
|
||||
errUnsupportedBegin = errors.New("unsupported method:Begin")
|
||||
errUnsupportedTransaction = errors.New("unsupported method:Transaction")
|
||||
errUnsupportedInsertIgnore = errors.New("unsupported method: InsertIgnore")
|
||||
errUnsupportedInsertGetId = errors.New("unsupported method: InsertGetId")
|
||||
errUnsupportedReplace = errors.New("unsupported method: Replace")
|
||||
errUnsupportedBegin = errors.New("unsupported method: Begin")
|
||||
errUnsupportedTransaction = errors.New("unsupported method: Transaction")
|
||||
errSQLNull = errors.New("SQL cannot be null")
|
||||
)
|
||||
|
||||
@ -105,7 +106,9 @@ func (d *Driver) Tables(ctx context.Context, schema ...string) (tables []string,
|
||||
|
||||
// TableFields retrieves and returns the fields' information of specified table of current schema.
|
||||
// Also see DriverMysql.TableFields.
|
||||
func (d *Driver) TableFields(ctx context.Context, table string, schema ...string) (fields map[string]*gdb.TableField, err error) {
|
||||
func (d *Driver) TableFields(
|
||||
ctx context.Context, table string, schema ...string,
|
||||
) (fields map[string]*gdb.TableField, err error) {
|
||||
charL, charR := d.GetChars()
|
||||
table = gstr.Trim(table, charL+charR)
|
||||
if gstr.Contains(table, " ") {
|
||||
@ -204,11 +207,13 @@ func (d *Driver) ping(conn *sql.DB) error {
|
||||
}
|
||||
|
||||
// DoFilter handles the sql before posts it to database.
|
||||
func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, originSql string, args []interface{}) (newSql string, newArgs []interface{}, err error) {
|
||||
// replace STD SQL to Clickhouse SQL grammar
|
||||
// MySQL eg: UPDATE visits SET xxx
|
||||
func (d *Driver) DoFilter(
|
||||
ctx context.Context, link gdb.Link, originSql string, args []interface{},
|
||||
) (newSql string, newArgs []interface{}, err error) {
|
||||
// It replaces STD SQL to Clickhouse SQL grammar.
|
||||
// MySQL eg: UPDATE visits SET xxx
|
||||
// Clickhouse eg: ALTER TABLE visits UPDATE xxx
|
||||
// MySQL eg: DELETE FROM VISIT
|
||||
// MySQL eg: DELETE FROM VISIT
|
||||
// Clickhouse eg: ALTER TABLE VISIT DELETE WHERE filter_expr
|
||||
result, err := gregex.MatchString("(?i)^UPDATE|DELETE", originSql)
|
||||
if err != nil {
|
||||
@ -238,7 +243,9 @@ func (d *Driver) DoCommit(ctx context.Context, in gdb.DoCommitInput) (out gdb.Do
|
||||
return d.Core.DoCommit(ctx, in)
|
||||
}
|
||||
|
||||
func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption) (result sql.Result, err error) {
|
||||
func (d *Driver) DoInsert(
|
||||
ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption,
|
||||
) (result sql.Result, err error) {
|
||||
var (
|
||||
keys []string // Field names.
|
||||
valueHolder = make([]string, 0)
|
||||
@ -270,7 +277,7 @@ func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list
|
||||
return
|
||||
}
|
||||
for i := 0; i < len(list); i++ {
|
||||
params := []interface{}{} // Values that will be committed to underlying database driver.
|
||||
params := make([]interface{}, 0) // Values that will be committed to underlying database driver.
|
||||
for _, k := range keys {
|
||||
params = append(params, list[i][k])
|
||||
}
|
||||
|
||||
@ -53,7 +53,7 @@ func (c *Core) Ctx(ctx context.Context) DB {
|
||||
panic(err)
|
||||
}
|
||||
newCore.ctx = WithDB(ctx, newCore.db)
|
||||
newCore.ctx = c.injectInternalCtxData(newCore.ctx)
|
||||
newCore.ctx = c.InjectInternalCtxData(newCore.ctx)
|
||||
return newCore.db
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ func (c *Core) GetCtx() context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.TODO()
|
||||
}
|
||||
return c.injectInternalCtxData(ctx)
|
||||
return c.InjectInternalCtxData(ctx)
|
||||
}
|
||||
|
||||
// GetCtxTimeout returns the context and cancel function for specified timeout type.
|
||||
|
||||
@ -31,10 +31,10 @@ const (
|
||||
// for example: `clickhouse`. The `clickhouse` does not support fetching insert/update results,
|
||||
// but returns errors when execute `RowsAffected`. It here ignores the calling of `RowsAffected`
|
||||
// to avoid triggering errors, rather than ignoring errors after they are triggered.
|
||||
ignoreResultInCtx gctx.StrKey = "IgnoreResult"
|
||||
ignoreResultKeyInCtx gctx.StrKey = "IgnoreResult"
|
||||
)
|
||||
|
||||
func (c *Core) injectInternalCtxData(ctx context.Context) context.Context {
|
||||
func (c *Core) InjectInternalCtxData(ctx context.Context) context.Context {
|
||||
// If the internal data is already injected, it does nothing.
|
||||
if ctx.Value(internalCtxDataKeyInCtx) != nil {
|
||||
return ctx
|
||||
@ -44,25 +44,23 @@ func (c *Core) injectInternalCtxData(ctx context.Context) context.Context {
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Core) InjectIgnoreResult(ctx context.Context) context.Context {
|
||||
if ctx.Value(ignoreResultInCtx) != nil {
|
||||
return ctx
|
||||
}
|
||||
return context.WithValue(ctx, ignoreResultInCtx, &internalCtxData{
|
||||
DB: c.db,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Core) getIgnoreResultFromCtx(ctx context.Context) *internalCtxData {
|
||||
if v := ctx.Value(ignoreResultInCtx); v != nil {
|
||||
return v.(*internalCtxData)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) getInternalCtxDataFromCtx(ctx context.Context) *internalCtxData {
|
||||
func (c *Core) GetInternalCtxDataFromCtx(ctx context.Context) *internalCtxData {
|
||||
if v := ctx.Value(internalCtxDataKeyInCtx); v != nil {
|
||||
return v.(*internalCtxData)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Core) InjectIgnoreResult(ctx context.Context) context.Context {
|
||||
if ctx.Value(ignoreResultKeyInCtx) != nil {
|
||||
return ctx
|
||||
}
|
||||
return context.WithValue(ctx, ignoreResultKeyInCtx, true)
|
||||
}
|
||||
|
||||
func (c *Core) GetIgnoreResultFromCtx(ctx context.Context) bool {
|
||||
if ctx.Value(ignoreResultKeyInCtx) != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -159,7 +159,7 @@ func (tx *TX) transactionKeyForNestedPoint() string {
|
||||
func (tx *TX) Ctx(ctx context.Context) *TX {
|
||||
tx.ctx = ctx
|
||||
if tx.ctx != nil {
|
||||
tx.ctx = tx.db.GetCore().injectInternalCtxData(tx.ctx)
|
||||
tx.ctx = tx.db.GetCore().InjectInternalCtxData(tx.ctx)
|
||||
}
|
||||
return tx
|
||||
}
|
||||
|
||||
@ -165,7 +165,7 @@ func (c *Core) sqlParsingHandler(ctx context.Context, in sqlParsingHandlerInput)
|
||||
// DoCommit commits current sql and arguments to underlying sql driver.
|
||||
func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) {
|
||||
// Inject internal data into ctx, especially for transaction creating.
|
||||
ctx = c.injectInternalCtxData(ctx)
|
||||
ctx = c.InjectInternalCtxData(ctx)
|
||||
|
||||
var (
|
||||
sqlTx *sql.Tx
|
||||
@ -261,7 +261,7 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
|
||||
}
|
||||
// Result handling.
|
||||
switch {
|
||||
case sqlResult != nil && c.getIgnoreResultFromCtx(ctx) == nil:
|
||||
case sqlResult != nil && !c.GetIgnoreResultFromCtx(ctx):
|
||||
rowsAffected, err = sqlResult.RowsAffected()
|
||||
out.Result = sqlResult
|
||||
|
||||
@ -401,7 +401,7 @@ func (c *Core) RowsToResult(ctx context.Context, rows *sql.Rows) (Result, error)
|
||||
columnNames[k] = v.Name()
|
||||
}
|
||||
if len(columnNames) > 0 {
|
||||
if internalData := c.getInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData := c.GetInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
internalData.FirstResultColumn = columnNames[0]
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,7 +75,7 @@ func (m *Model) getSelectResultFromCache(ctx context.Context, sql string, args .
|
||||
)
|
||||
defer func() {
|
||||
if cacheItem != nil {
|
||||
if internalData := m.db.GetCore().getInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData := m.db.GetCore().GetInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData.FirstResultColumn == "" {
|
||||
internalData.FirstResultColumn = cacheItem.FirstResultColumn
|
||||
}
|
||||
@ -114,7 +114,7 @@ func (m *Model) saveSelectResultToCache(ctx context.Context, result Result, sql
|
||||
var cacheItem = &selectCacheItem{
|
||||
Result: result,
|
||||
}
|
||||
if internalData := m.db.GetCore().getInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData := m.db.GetCore().GetInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
cacheItem.FirstResultColumn = internalData.FirstResultColumn
|
||||
}
|
||||
if errCache := cacheObj.Set(ctx, cacheKey, cacheItem, m.cacheOption.Duration); errCache != nil {
|
||||
|
||||
@ -174,7 +174,7 @@ func (m *Model) Value(fieldsAndWhere ...interface{}) (Value, error) {
|
||||
if len(all) == 0 {
|
||||
return gvar.New(nil), nil
|
||||
}
|
||||
if internalData := m.db.GetCore().getInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData := m.db.GetCore().GetInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
record := all[0]
|
||||
if v, ok := record[internalData.FirstResultColumn]; ok {
|
||||
return v, nil
|
||||
@ -391,7 +391,7 @@ func (m *Model) Count(where ...interface{}) (int, error) {
|
||||
return 0, err
|
||||
}
|
||||
if len(all) > 0 {
|
||||
if internalData := m.db.GetCore().getInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
if internalData := m.db.GetCore().GetInternalCtxDataFromCtx(ctx); internalData != nil {
|
||||
record := all[0]
|
||||
if v, ok := record[internalData.FirstResultColumn]; ok {
|
||||
return v.Int(), nil
|
||||
|
||||
Reference in New Issue
Block a user