remove Sharding feature

This commit is contained in:
John Guo
2022-05-07 16:38:17 +08:00
parent 5332ce4c79
commit c90f91dcbe
8 changed files with 35 additions and 457 deletions

View File

@ -104,7 +104,7 @@ func (d *Driver) FilteredLink() string {
// GetChars returns the security char for this type of database.
func (d *Driver) GetChars() (charLeft string, charRight string) {
return "\"", "\""
return `"`, `"`
}
// DoFilter deals with the sql string before commits it to underlying sql driver.

View File

@ -108,7 +108,7 @@ func (d *Driver) FilteredLink() string {
// GetChars returns the security char for this type of database.
func (d *Driver) GetChars() (charLeft string, charRight string) {
return "\"", "\""
return `"`, `"`
}
// DoFilter deals with the sql string before commits it to underlying sql driver.

View File

@ -106,7 +106,7 @@ func (d *Driver) FilteredLink() string {
// GetChars returns the security char for this type of database.
func (d *Driver) GetChars() (charLeft string, charRight string) {
return "\"", "\""
return `"`, `"`
}
// DoFilter deals with the sql string before commits it to underlying sql driver.

View File

@ -137,31 +137,6 @@ type sqlParsingHandlerOutput struct {
DoCommitInput
}
func (c *Core) sqlParsingHandler(ctx context.Context, in sqlParsingHandlerInput) (out *sqlParsingHandlerOutput, err error) {
var shardingOut *callShardingHandlerFromCtxOutput
// Sharding handling.
shardingOut, err = c.callShardingHandlerFromCtx(ctx, callShardingHandlerFromCtxInput{
Sql: in.Sql,
FormattedSql: in.FormattedSql,
})
if err != nil {
return
}
if shardingOut != nil {
if shardingOut.Sql != "" {
in.Sql = shardingOut.Sql
}
// If schema changes, it here creates and uses a new DB link operation object.
if shardingOut.Schema != c.db.GetSchema() {
in.Link, err = c.db.GetCore().GetLink(ctx, in.Link.IsOnMaster(), shardingOut.Schema)
}
}
out = &sqlParsingHandlerOutput{
DoCommitInput: in.DoCommitInput,
}
return
}
// DoCommit commits current sql and arguments to underlying sql driver.
func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) {
// Inject internal data into ctx, especially for transaction creating.
@ -180,18 +155,6 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
timestampMilli1 = gtime.TimestampMilli()
)
// SQL parser handler.
sqlParsingHandlerOut, err := c.sqlParsingHandler(ctx, sqlParsingHandlerInput{
DoCommitInput: in,
FormattedSql: formattedSql,
})
if err != nil {
return
}
if sqlParsingHandlerOut != nil {
in = sqlParsingHandlerOut.DoCommitInput
}
// Trace span start.
tr := otel.GetTracerProvider().Tracer(traceInstrumentName, trace.WithInstrumentationVersion(gf.VERSION))
ctx, span := tr.Start(ctx, in.Type, trace.WithSpanKind(trace.SpanKindInternal))

View File

@ -13,7 +13,6 @@ import (
"net/url"
_ "github.com/go-sql-driver/mysql"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/intlog"

View File

@ -18,39 +18,38 @@ import (
// Model is core struct implementing the DAO for ORM.
type Model struct {
*modelWhereBuilder
db DB // Underlying DB interface.
tx *TX // Underlying TX interface.
rawSql string // rawSql is the raw SQL string which marks a raw SQL based Model not a table based Model.
schema string // Custom database schema.
linkType int // Mark for operation on master or slave.
tablesInit string // Table names when model initialization.
tables string // Operation table names, which can be more than one table names and aliases, like: "user", "user u", "user u, user_detail ud".
fields string // Operation fields, multiple fields joined using char ','.
fieldsEx string // Excluded operation fields, multiple fields joined using char ','.
withArray []interface{} // Arguments for With feature.
withAll bool // Enable model association operations on all objects that have "with" tag in the struct.
extraArgs []interface{} // Extra custom arguments for sql, which are prepended to the arguments before sql committed to underlying driver.
whereBuilder *WhereBuilder // Condition builder for where operation.
groupBy string // Used for "group by" statement.
orderBy string // Used for "order by" statement.
having []interface{} // Used for "having..." statement.
start int // Used for "select ... start, limit ..." statement.
limit int // Used for "select ... start, limit ..." statement.
option int // Option for extra operation features.
offset int // Offset statement for some databases grammar.
data interface{} // Data for operation, which can be type of map/[]map/struct/*struct/string, etc.
batch int // Batch number for batch Insert/Replace/Save operations.
filter bool // Filter data and where key-value pairs according to the fields of the table.
distinct string // Force the query to only return distinct results.
lockInfo string // Lock for update or in shared lock.
cacheEnabled bool // Enable sql result cache feature, which is mainly for indicating cache duration(especially 0) usage.
cacheOption CacheOption // Cache option for query statement.
hookHandler HookHandler // Hook functions for model hook feature.
shardingHandler ShardingHandler // Custom sharding handler for sharding feature.
unscoped bool // Disables soft deleting features when select/delete operations.
safe bool // If true, it clones and returns a new model object whenever operation done; or else it changes the attribute of current model.
onDuplicate interface{} // onDuplicate is used for ON "DUPLICATE KEY UPDATE" statement.
onDuplicateEx interface{} // onDuplicateEx is used for excluding some columns ON "DUPLICATE KEY UPDATE" statement.
db DB // Underlying DB interface.
tx *TX // Underlying TX interface.
rawSql string // rawSql is the raw SQL string which marks a raw SQL based Model not a table based Model.
schema string // Custom database schema.
linkType int // Mark for operation on master or slave.
tablesInit string // Table names when model initialization.
tables string // Operation table names, which can be more than one table names and aliases, like: "user", "user u", "user u, user_detail ud".
fields string // Operation fields, multiple fields joined using char ','.
fieldsEx string // Excluded operation fields, multiple fields joined using char ','.
withArray []interface{} // Arguments for With feature.
withAll bool // Enable model association operations on all objects that have "with" tag in the struct.
extraArgs []interface{} // Extra custom arguments for sql, which are prepended to the arguments before sql committed to underlying driver.
whereBuilder *WhereBuilder // Condition builder for where operation.
groupBy string // Used for "group by" statement.
orderBy string // Used for "order by" statement.
having []interface{} // Used for "having..." statement.
start int // Used for "select ... start, limit ..." statement.
limit int // Used for "select ... start, limit ..." statement.
option int // Option for extra operation features.
offset int // Offset statement for some databases grammar.
data interface{} // Data for operation, which can be type of map/[]map/struct/*struct/string, etc.
batch int // Batch number for batch Insert/Replace/Save operations.
filter bool // Filter data and where key-value pairs according to the fields of the table.
distinct string // Force the query to only return distinct results.
lockInfo string // Lock for update or in shared lock.
cacheEnabled bool // Enable sql result cache feature, which is mainly for indicating cache duration(especially 0) usage.
cacheOption CacheOption // Cache option for query statement.
hookHandler HookHandler // Hook functions for model hook feature.
unscoped bool // Disables soft deleting features when select/delete operations.
safe bool // If true, it clones and returns a new model object whenever operation done; or else it changes the attribute of current model.
onDuplicate interface{} // onDuplicate is used for ON "DUPLICATE KEY UPDATE" statement.
onDuplicateEx interface{} // onDuplicateEx is used for excluding some columns ON "DUPLICATE KEY UPDATE" statement.
}
type modelWhereBuilder = WhereBuilder

View File

@ -1,245 +0,0 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb
import (
"context"
"strings"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gctx"
"github.com/longbridgeapp/sqlparser"
)
// ShardingInput is input parameters for custom sharding handler.
type ShardingInput struct {
Table string // Current operation table name.
Schema string // Current operation schema, usually empty string which means uses default schema from configuration.
OperationData map[string]Value // Accurate readonly key-value data pairs from INSERT/UPDATE statement.
ConditionData map[string]Value // Accurate readonly key-value condition pairs from SELECT/UPDATE/DELETE statement.
}
// ShardingOutput is output parameters for custom sharding handler.
type ShardingOutput struct {
Table string // New table name for current operation. Use empty string for no changes of table name.
Schema string // New schema name for current operation. Use empty string for using default schema from configuration.
}
// ShardingHandler is a custom function for custom sharding table and schema for DB operation.
type ShardingHandler func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error)
const (
ctxKeyForShardingHandler gctx.StrKey = "ShardingHandler"
)
// Sharding creates and returns a new model with sharding handler.
func (m *Model) Sharding(handler ShardingHandler) *Model {
var (
ctx = m.GetCtx()
model = m.getModel()
)
model.shardingHandler = handler
// Inject sharding handler into context.
model = model.Ctx(model.injectShardingInputCaller(ctx))
return model
}
// injectShardingInputCaller injects custom sharding handler into context.
func (m *Model) injectShardingInputCaller(ctx context.Context) context.Context {
if m.shardingHandler == nil {
return ctx
}
if ctx.Value(ctxKeyForShardingHandler) != nil {
return ctx
}
return context.WithValue(ctx, ctxKeyForShardingHandler, m.shardingHandler)
}
type callShardingHandlerFromCtxInput struct {
Sql string
FormattedSql string
}
type callShardingHandlerFromCtxOutput struct {
Sql string
Table string
Schema string
ParsedSqlOutput *parseFormattedSqlOutput
}
func (c *Core) callShardingHandlerFromCtx(
ctx context.Context, in callShardingHandlerFromCtxInput,
) (out *callShardingHandlerFromCtxOutput, err error) {
var (
newSql = in.Sql
ctxValue interface{}
shardingHandler ShardingHandler
ok bool
)
// If no sharding handler, it does nothing.
if ctxValue = ctx.Value(ctxKeyForShardingHandler); ctxValue == nil {
return nil, nil
}
if shardingHandler, ok = ctxValue.(ShardingHandler); !ok {
return nil, nil
}
parsedOut, err := c.parseFormattedSql(in.FormattedSql)
if err != nil {
return nil, err
}
var shardingIn = ShardingInput{
Table: parsedOut.Table,
Schema: c.db.GetSchema(),
OperationData: parsedOut.OperationData,
ConditionData: parsedOut.ConditionData,
}
shardingOut, err := shardingHandler(ctx, shardingIn)
if err != nil {
return nil, gerror.Wrap(err, `calling sharding handler failed`)
}
if shardingOut.Table != shardingIn.Table || shardingOut.Schema != shardingIn.Schema {
if shardingOut.Table != shardingIn.Table {
newSql, err = c.formatSqlWithNewTable(in.Sql, shardingOut.Table)
if err != nil {
return nil, err
}
}
out = &callShardingHandlerFromCtxOutput{
Sql: newSql,
Table: shardingOut.Table,
Schema: shardingOut.Schema,
ParsedSqlOutput: parsedOut,
}
return out, nil
}
return nil, nil
}
// formatSqlWithNewTable modifies given `sql` and returns a sql with new table name `table`.
func (c *Core) formatSqlWithNewTable(sql, table string) (newSql string, err error) {
parsedStmt, err := sqlparser.NewParser(strings.NewReader(sql)).ParseStatement()
if err != nil {
return "", gerror.Wrapf(err, `parse failed for SQL: %s`, sql)
}
newTable := &sqlparser.TableName{Name: &sqlparser.Ident{Name: table}}
switch stmt := parsedStmt.(type) {
case *sqlparser.SelectStatement:
stmt.FromItems = newTable
return stmt.String(), nil
case *sqlparser.InsertStatement:
stmt.TableName = newTable
return stmt.String(), nil
case *sqlparser.UpdateStatement:
stmt.TableName = newTable
return stmt.String(), nil
case *sqlparser.DeleteStatement:
stmt.TableName = newTable
return stmt.String(), nil
default:
return "", gerror.Wrapf(err, `unsupported SQL: %s`, sql)
}
}
type parseFormattedSqlOutput struct {
Table string
OperationData map[string]Value
ConditionData map[string]Value
ParsedStmt sqlparser.Statement
SelectedFields []string
}
func (c *Core) parseFormattedSql(formattedSql string) (*parseFormattedSqlOutput, error) {
var (
condition sqlparser.Expr
err error
out = &parseFormattedSqlOutput{
SelectedFields: make([]string, 0),
OperationData: make(map[string]Value),
ConditionData: make(map[string]Value),
}
)
out.ParsedStmt, err = sqlparser.NewParser(strings.NewReader(formattedSql)).ParseStatement()
if err != nil {
return nil, gerror.Wrapf(err, `parse failed for SQL: %s`, formattedSql)
}
switch stmt := out.ParsedStmt.(type) {
case *sqlparser.SelectStatement:
if stmt.FromItems != nil {
table, ok := stmt.FromItems.(*sqlparser.TableName)
if !ok {
return nil, gerror.Newf(
`invalid table name "%s" in SQL: %s`,
stmt.FromItems.String(), formattedSql,
)
}
out.Table = table.TableName()
}
condition = stmt.Condition
if stmt.Columns != nil {
for _, column := range *stmt.Columns {
if column.Alias != nil {
out.SelectedFields = append(out.SelectedFields, column.Alias.Name)
} else if column.Expr != nil {
out.SelectedFields = append(out.SelectedFields, column.Expr.String())
}
}
}
case *sqlparser.InsertStatement:
out.Table = stmt.TableName.TableName()
if len(stmt.Expressions) > 0 && len(stmt.ColumnNames) > 0 {
names := make([]string, len(stmt.ColumnNames))
for i, ident := range stmt.ColumnNames {
names[i] = ident.Name
}
// It just uses the first item.
for i, expr := range stmt.Expressions[0].Exprs {
c.injectDataByExpr(out.OperationData, names[i], expr)
}
}
case *sqlparser.UpdateStatement:
out.Table = stmt.TableName.TableName()
condition = stmt.Condition
if len(stmt.Assignments) > 0 {
for _, assignment := range stmt.Assignments {
if len(assignment.Columns) > 0 {
c.injectDataByExpr(out.OperationData, assignment.Columns[0].Name, assignment.Expr)
}
}
}
case *sqlparser.DeleteStatement:
out.Table = stmt.TableName.TableName()
condition = stmt.Condition
default:
return nil, gerror.Wrapf(err, `unsupported SQL: %s`, formattedSql)
}
err = sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error {
if n, ok := node.(*sqlparser.BinaryExpr); ok {
if x, ok := n.X.(*sqlparser.Ident); ok {
if n.Op == sqlparser.EQ {
c.injectDataByExpr(out.ConditionData, x.Name, n.Y)
}
}
}
return nil
}), condition)
return out, err
}
func (c *Core) injectDataByExpr(data map[string]Value, name string, expr sqlparser.Expr) {
switch exprImp := expr.(type) {
case *sqlparser.StringLit:
data[name] = gvar.New(exprImp.Value)
case *sqlparser.NumberLit:
data[name] = gvar.New(exprImp.Value)
default:
data[name] = gvar.New(exprImp.String())
}
}

View File

@ -1,138 +0,0 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gdb_test
import (
"context"
"testing"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/test/gtest"
)
func Test_Model_Sharding(t *testing.T) {
table1 := createTable()
table2 := createTable()
defer dropTable(table1)
defer dropTable(table2)
gtest.C(t, func(t *gtest.T) {
_, err1 := db.Model(table1).Data(g.Map{
"id": 1,
}).Insert()
t.AssertNil(err1)
_, err2 := db.Model(table2).Data(g.Map{
"id": 2,
}).Insert()
t.AssertNil(err2)
})
// no sharding.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model(table1).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 1)
})
// with sharding handler.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model(table1).Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name.
gtest.C(t, func(t *gtest.T) {
all, err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name and tables fields retrieving.
gtest.C(t, func(t *gtest.T) {
type User struct {
Id int
Passport string
Password string
NickName string
}
var users []User
err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
}
return
}).Scan(&users)
t.AssertNil(err)
t.Assert(len(users), 1)
t.Assert(users[0].Id, 2)
})
}
func Test_Model_Sharding_Schema(t *testing.T) {
var (
db1 = db
db2 = db.Schema(TestSchema2)
table1 = createTableWithDb(db1)
table2 = createTableWithDb(db2)
)
defer dropTableWithDb(db1, table1)
defer dropTableWithDb(db2, table2)
gtest.C(t, func(t *gtest.T) {
_, err1 := db1.Model(table1).Data(g.Map{
"id": 1,
}).Insert()
t.AssertNil(err1)
_, err2 := db2.Model(table2).Data(g.Map{
"id": 2,
}).Insert()
t.AssertNil(err2)
})
// no sharding.
gtest.C(t, func(t *gtest.T) {
all, err := db1.Model(table1).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 1)
})
gtest.C(t, func(t *gtest.T) {
_, err := db1.Model(table2).All()
// Table not exist error.
t.AssertNE(err, nil)
})
gtest.C(t, func(t *gtest.T) {
all, err := db2.Model(table2).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
// with sharding handler and no existence table name and schema change.
gtest.C(t, func(t *gtest.T) {
all, err := db1.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) {
out = &gdb.ShardingOutput{
Table: table2,
Schema: TestSchema2,
}
return
}).All()
t.AssertNil(err)
t.Assert(len(all), 1)
t.Assert(all[0]["id"].String(), 2)
})
}