From 6dacdd60dc11f2d09881dcab31300e9eece8c129 Mon Sep 17 00:00:00 2001 From: John Guo Date: Mon, 21 Mar 2022 21:17:48 +0800 Subject: [PATCH] add sharding feature for package gdb --- cmd/gf/go.mod | 1 + cmd/gf/go.sum | 3 + database/gdb/gdb.go | 2 + database/gdb/gdb_core_link.go | 18 +- database/gdb/gdb_core_sharding.go | 228 ++++++++++++++++++ database/gdb/gdb_core_underlying.go | 25 +- database/gdb/gdb_core_utility.go | 31 ++- database/gdb/gdb_model.go | 1 + database/gdb/gdb_model_delete.go | 8 +- database/gdb/gdb_model_hook.go | 37 +-- database/gdb/gdb_model_insert.go | 4 +- database/gdb/gdb_model_select.go | 4 +- database/gdb/gdb_model_sharding.go | 80 ------ database/gdb/gdb_model_update.go | 4 +- database/gdb/gdb_model_with.go | 4 +- .../gdb/gdb_z_mysql_feature_sharding_test.go | 138 +++++++++++ go.mod | 1 + go.sum | 4 + 18 files changed, 481 insertions(+), 112 deletions(-) create mode 100644 database/gdb/gdb_core_sharding.go delete mode 100644 database/gdb/gdb_model_sharding.go create mode 100644 database/gdb/gdb_z_mysql_feature_sharding_test.go diff --git a/cmd/gf/go.mod b/cmd/gf/go.mod index ca9f97fe7..c644cd134 100644 --- a/cmd/gf/go.mod +++ b/cmd/gf/go.mod @@ -7,6 +7,7 @@ require ( github.com/gogf/gf/contrib/drivers/pgsql/v2 v2.0.0-rc2 github.com/gogf/gf/contrib/drivers/sqlite/v2 v2.0.0-rc2 github.com/gogf/gf/v2 v2.0.0 + github.com/longbridgeapp/sqlparser v0.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect ) diff --git a/cmd/gf/go.sum b/cmd/gf/go.sum index 4e6605816..39e7c3e21 100644 --- a/cmd/gf/go.sum +++ b/cmd/gf/go.sum @@ -22,6 +22,7 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -47,6 +48,8 @@ github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/longbridgeapp/sqlparser v0.3.1 h1:iWOZWGIFgQrJRgobLXUNJdvqGRpbVXkyKUKUA5CNJBE= +github.com/longbridgeapp/sqlparser v0.3.1/go.mod h1:GIHaUq8zvYyHLCLMJJykx1CdM6LHtkUih/QaJXySSx4= github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= diff --git a/database/gdb/gdb.go b/database/gdb/gdb.go index a112d16cd..735656ac7 100644 --- a/database/gdb/gdb.go +++ b/database/gdb/gdb.go @@ -215,10 +215,12 @@ type Driver interface { } // Link is a common database function wrapper interface. +// Note that, any operation using `Link` will have no SQL logging. type Link interface { QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) + IsOnMaster() bool IsTransaction() bool } diff --git a/database/gdb/gdb_core_link.go b/database/gdb/gdb_core_link.go index 06ea3190b..b4fdd03aa 100644 --- a/database/gdb/gdb_core_link.go +++ b/database/gdb/gdb_core_link.go @@ -12,7 +12,8 @@ import ( // dbLink is used to implement interface Link for DB. type dbLink struct { - *sql.DB + *sql.DB // Underlying DB object. + isOnMaster bool // isOnMaster marks whether current link is operated on master node. } // txLink is used to implement interface Link for TX. @@ -21,11 +22,22 @@ type txLink struct { } // IsTransaction returns if current Link is a transaction. -func (*dbLink) IsTransaction() bool { +func (l *dbLink) IsTransaction() bool { return false } +// IsOnMaster checks and returns whether current link is operated on master node. +func (l *dbLink) IsOnMaster() bool { + return l.isOnMaster +} + // IsTransaction returns if current Link is a transaction. -func (*txLink) IsTransaction() bool { +func (l *txLink) IsTransaction() bool { + return true +} + +// IsOnMaster checks and returns whether current link is operated on master node. +// Note that, transaction operation is always operated on master node. +func (l *txLink) IsOnMaster() bool { return true } diff --git a/database/gdb/gdb_core_sharding.go b/database/gdb/gdb_core_sharding.go new file mode 100644 index 000000000..f27c26b23 --- /dev/null +++ b/database/gdb/gdb_core_sharding.go @@ -0,0 +1,228 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gdb + +import ( + "context" + "strings" + + "github.com/gogf/gf/v2/container/gvar" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/gctx" + "github.com/longbridgeapp/sqlparser" +) + +// ShardingInput is input parameters for custom sharding handler. +type ShardingInput struct { + Table string // Current operation table name. + Schema string // Current operation schema, usually empty string which means uses default schema from configuration. + OperationData map[string]Value // Accurate readonly key-value data pairs from INSERT/UPDATE statement. + ConditionData map[string]Value // Accurate readonly key-value condition pairs from SELECT/UPDATE/DELETE statement. +} + +// ShardingOutput is output parameters for custom sharding handler. +type ShardingOutput struct { + Table string // New table name for current operation. Use empty string for no changes of table name. + Schema string // New schema name for current operation. Use empty string for using default schema from configuration. +} + +// ShardingHandler is a custom function for custom sharding table and schema for DB operation. +type ShardingHandler func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error) + +const ( + ctxKeyForShardingHandler gctx.StrKey = "ShardingHandler" +) + +// Sharding creates and returns a new model with sharding handler. +func (m *Model) Sharding(handler ShardingHandler) *Model { + var ( + ctx = m.GetCtx() + model = m.getModel() + ) + model.shardingHandler = handler + // Inject sharding handler into context. + model = model.Ctx(model.injectShardingInputCaller(ctx)) + return model +} + +// injectShardingInputCaller injects custom sharding handler into context. +func (m *Model) injectShardingInputCaller(ctx context.Context) context.Context { + if m.shardingHandler == nil { + return ctx + } + if ctx.Value(ctxKeyForShardingHandler) != nil { + return ctx + } + return context.WithValue(ctx, ctxKeyForShardingHandler, m.shardingHandler) +} + +type callShardingHandlerFromCtxInput struct { + Sql string + FormattedSql string +} + +type callShardingHandlerFromCtxOutput struct { + Sql string + Table string + Schema string +} + +func (c *Core) callShardingHandlerFromCtx( + ctx context.Context, in callShardingHandlerFromCtxInput, +) (out *callShardingHandlerFromCtxOutput, err error) { + var ( + newSql = in.Sql + ctxValue interface{} + shardingHandler ShardingHandler + ok bool + ) + if ctxValue = ctx.Value(ctxKeyForShardingHandler); ctxValue == nil { + return nil, nil + } + if shardingHandler, ok = ctxValue.(ShardingHandler); !ok { + return nil, nil + } + parsedOut, err := c.parseFormattedSql(in.FormattedSql) + if err != nil { + return nil, err + } + var shardingIn = ShardingInput{ + Table: parsedOut.Table, + Schema: c.db.GetSchema(), + OperationData: parsedOut.OperationData, + ConditionData: parsedOut.ConditionData, + } + shardingOut, err := shardingHandler(ctx, shardingIn) + if err != nil { + return nil, gerror.Wrap(err, `calling sharding handler failed`) + } + if shardingOut.Table != shardingIn.Table || shardingOut.Schema != shardingIn.Schema { + if shardingOut.Table != shardingIn.Table { + newSql, err = c.formatSqlWithNewTable(in.Sql, shardingOut.Table) + if err != nil { + return nil, err + } + } + out = &callShardingHandlerFromCtxOutput{ + Sql: newSql, + Table: shardingOut.Table, + Schema: shardingOut.Schema, + } + return out, nil + } + return nil, nil +} + +// formatSqlWithNewTable modifies given `sql` and returns a sql with new table name `table`. +func (c *Core) formatSqlWithNewTable(sql, table string) (newSql string, err error) { + parsedStmt, err := sqlparser.NewParser(strings.NewReader(sql)).ParseStatement() + if err != nil { + return "", gerror.Wrapf(err, `parse failed for SQL: %s`, sql) + } + newTable := &sqlparser.TableName{Name: &sqlparser.Ident{Name: table}} + switch stmt := parsedStmt.(type) { + case *sqlparser.SelectStatement: + stmt.FromItems = newTable + return stmt.String(), nil + case *sqlparser.InsertStatement: + stmt.TableName = newTable + return stmt.String(), nil + case *sqlparser.UpdateStatement: + stmt.TableName = newTable + return stmt.String(), nil + case *sqlparser.DeleteStatement: + stmt.TableName = newTable + return stmt.String(), nil + default: + return "", gerror.Wrapf(err, `unsupported SQL: %s`, sql) + } +} + +type parseFormattedSqlOutput struct { + Table string + OperationData map[string]Value + ConditionData map[string]Value + ParsedStmt sqlparser.Statement +} + +func (c *Core) parseFormattedSql(formattedSql string) (*parseFormattedSqlOutput, error) { + var ( + condition sqlparser.Expr + err error + out = &parseFormattedSqlOutput{ + OperationData: make(map[string]Value), + ConditionData: make(map[string]Value), + } + ) + out.ParsedStmt, err = sqlparser.NewParser(strings.NewReader(formattedSql)).ParseStatement() + if err != nil { + return nil, gerror.Wrapf(err, `parse failed for SQL: %s`, formattedSql) + } + switch stmt := out.ParsedStmt.(type) { + case *sqlparser.SelectStatement: + table, ok := stmt.FromItems.(*sqlparser.TableName) + if !ok { + return nil, gerror.Newf( + `invalid table name "%s" in SQL: %s`, + stmt.FromItems.String(), formattedSql, + ) + } + out.Table = table.TableName() + condition = stmt.Condition + case *sqlparser.InsertStatement: + out.Table = stmt.TableName.TableName() + if len(stmt.Expressions) > 0 { + names := make([]string, len(stmt.ColumnNames)) + for i, ident := range stmt.ColumnNames { + names[i] = ident.Name + } + // It just uses the first item. + for i, expr := range stmt.Expressions[0].Exprs { + c.injectDataByExpr(out.OperationData, names[i], expr) + } + } + case *sqlparser.UpdateStatement: + out.Table = stmt.TableName.TableName() + condition = stmt.Condition + if len(stmt.Assignments) > 0 { + for _, assignment := range stmt.Assignments { + if len(assignment.Columns) > 0 { + c.injectDataByExpr(out.OperationData, assignment.Columns[0].Name, assignment.Expr) + } + } + } + case *sqlparser.DeleteStatement: + out.Table = stmt.TableName.TableName() + condition = stmt.Condition + + default: + return nil, gerror.Wrapf(err, `unsupported SQL: %s`, formattedSql) + } + + err = sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error { + if n, ok := node.(*sqlparser.BinaryExpr); ok { + if x, ok := n.X.(*sqlparser.Ident); ok { + if n.Op == sqlparser.EQ { + c.injectDataByExpr(out.ConditionData, x.Name, n.Y) + } + } + } + return nil + }), condition) + return out, err +} + +func (c *Core) injectDataByExpr(data map[string]Value, name string, expr sqlparser.Expr) { + switch exprImp := expr.(type) { + case *sqlparser.StringLit: + data[name] = gvar.New(exprImp.Value) + case *sqlparser.NumberLit: + data[name] = gvar.New(exprImp.Value) + default: + data[name] = gvar.New(exprImp.String()) + } +} diff --git a/database/gdb/gdb_core_underlying.go b/database/gdb/gdb_core_underlying.go index 60a255c9a..d9e564c16 100644 --- a/database/gdb/gdb_core_underlying.go +++ b/database/gdb/gdb_core_underlying.go @@ -129,6 +129,7 @@ func (c *Core) DoFilter(ctx context.Context, link Link, sql string, args []inter // DoCommit commits current sql and arguments to underlying sql driver. func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) { + var ( sqlTx *sql.Tx sqlStmt *sql.Stmt @@ -137,9 +138,31 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp stmtSqlRows *sql.Rows stmtSqlRow *sql.Row rowsAffected int64 + shardingOut *callShardingHandlerFromCtxOutput cancelFuncForTimeout context.CancelFunc + formattedSql = FormatSqlWithArgs(in.Sql, in.Args) timestampMilli1 = gtime.TimestampMilli() ) + shardingOut, err = c.callShardingHandlerFromCtx(ctx, callShardingHandlerFromCtxInput{ + Sql: in.Sql, + FormattedSql: formattedSql, + }) + if err != nil { + return + } + // Sharding handling. + if shardingOut != nil { + if shardingOut.Sql != "" { + in.Sql = shardingOut.Sql + } + // If schema changes, it here creates and uses a new DB link operation object. + if shardingOut.Schema != c.db.GetSchema() { + in.Link, err = c.db.GetCore().GetLink(ctx, in.Link.IsOnMaster(), shardingOut.Schema) + if err != nil { + return + } + } + } // Trace span start. tr := otel.GetTracerProvider().Tracer(traceInstrumentName, trace.WithInstrumentationVersion(gf.VERSION)) @@ -232,7 +255,7 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp Sql: in.Sql, Type: in.Type, Args: in.Args, - Format: FormatSqlWithArgs(in.Sql, in.Args), + Format: formattedSql, Error: err, Start: timestampMilli1, End: timestampMilli2, diff --git a/database/gdb/gdb_core_utility.go b/database/gdb/gdb_core_utility.go index 9dd3beedd..bbfeb74f1 100644 --- a/database/gdb/gdb_core_utility.go +++ b/database/gdb/gdb_core_utility.go @@ -41,6 +41,27 @@ func DBFromCtx(ctx context.Context) DB { return nil } +// GetLink creates and returns the underlying database link object with transaction checks. +// The parameter `master` specifies whether using the master node if master-slave configured. +func (c *Core) GetLink(ctx context.Context, master bool, schema string) (Link, error) { + tx := TXFromCtx(ctx, c.db.GetGroup()) + if tx != nil { + return &txLink{tx.tx}, nil + } + if master { + link, err := c.db.GetCore().MasterLink(schema) + if err != nil { + return nil, err + } + return link, nil + } + link, err := c.db.GetCore().SlaveLink(schema) + if err != nil { + return nil, err + } + return link, nil +} + // MasterLink acts like function Master but with additional `schema` parameter specifying // the schema for the connection. It is defined for internal usage. // Also see Master. @@ -49,7 +70,10 @@ func (c *Core) MasterLink(schema ...string) (Link, error) { if err != nil { return nil, err } - return &dbLink{db}, nil + return &dbLink{ + DB: db, + isOnMaster: true, + }, nil } // SlaveLink acts like function Slave but with additional `schema` parameter specifying @@ -60,7 +84,10 @@ func (c *Core) SlaveLink(schema ...string) (Link, error) { if err != nil { return nil, err } - return &dbLink{db}, nil + return &dbLink{ + DB: db, + isOnMaster: false, + }, nil } // QuoteWord checks given string `s` a word, diff --git a/database/gdb/gdb_model.go b/database/gdb/gdb_model.go index 7c433517b..51e4bc2ae 100644 --- a/database/gdb/gdb_model.go +++ b/database/gdb/gdb_model.go @@ -266,6 +266,7 @@ func (m *Model) Clone() *Model { } else { newModel = m.db.Model(m.tablesInit) } + // Basic attributes copy. *newModel = *m // Shallow copy slice attributes. if n := len(m.extraArgs); n > 0 { diff --git a/database/gdb/gdb_model_delete.go b/database/gdb/gdb_model_delete.go index 8ba960519..68bcfdb6f 100644 --- a/database/gdb/gdb_model_delete.go +++ b/database/gdb/gdb_model_delete.go @@ -37,8 +37,8 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) { in := &HookUpdateInput{ internalParamHookUpdate: internalParamHookUpdate{ internalParamHook: internalParamHook{ - db: m.db, - link: m.getLink(true), + link: m.getLink(true), + model: m, }, handler: m.hookHandler.Update, }, @@ -60,8 +60,8 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) { in := &HookDeleteInput{ internalParamHookDelete: internalParamHookDelete{ internalParamHook: internalParamHook{ - db: m.db, - link: m.getLink(true), + link: m.getLink(true), + model: m, }, handler: m.hookHandler.Delete, }, diff --git a/database/gdb/gdb_model_hook.go b/database/gdb/gdb_model_hook.go index ee94eb525..6df876236 100644 --- a/database/gdb/gdb_model_hook.go +++ b/database/gdb/gdb_model_hook.go @@ -31,10 +31,10 @@ type HookHandler struct { // internalParamHook manages all internal parameters for hook operations. // The `internal` obviously means you cannot access these parameters outside this package. type internalParamHook struct { - db DB // Underlying DB object. - link Link // Connection object from third party sql driver. - handlerCalled bool // Simple mark for custom handler called, in case of recursive calling. - removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix. + link Link // Connection object from third party sql driver. + model *Model // Underlying Model object. + handlerCalled bool // Simple mark for custom handler called, in case of recursive calling. + removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix. } type internalParamHookSelect struct { @@ -90,13 +90,22 @@ type HookDeleteInput struct { Args []interface{} } +const ( + whereKeyInCondition = " WHERE " +) + +// IsTransaction checks and returns whether current operation is during transaction. +func (h *internalParamHook) IsTransaction() bool { + return h.link.IsTransaction() +} + // Next calls the next hook handler. func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) { if h.handler != nil && !h.handlerCalled { h.handlerCalled = true return h.handler(ctx, h) } - return h.db.DoSelect(ctx, h.link, h.Sql, h.Args...) + return h.model.db.DoSelect(ctx, h.link, h.Sql, h.Args...) } // Next calls the next hook handler. @@ -105,39 +114,39 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro h.handlerCalled = true return h.handler(ctx, h) } - return h.db.DoInsert(ctx, h.link, h.Table, h.Data, h.Option) + return h.model.db.DoInsert(ctx, h.link, h.Table, h.Data, h.Option) } // Next calls the next hook handler. func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err error) { if h.handler != nil && !h.handlerCalled { h.handlerCalled = true - if gstr.HasPrefix(h.Condition, " WHERE ") { + if gstr.HasPrefix(h.Condition, whereKeyInCondition) { h.removedWhere = true - h.Condition = gstr.TrimLeftStr(h.Condition, " WHERE ") + h.Condition = gstr.TrimLeftStr(h.Condition, whereKeyInCondition) } return h.handler(ctx, h) } if h.removedWhere { - h.Condition = " WHERE " + h.Condition + h.Condition = whereKeyInCondition + h.Condition } - return h.db.DoUpdate(ctx, h.link, h.Table, h.Data, h.Condition, h.Args...) + return h.model.db.DoUpdate(ctx, h.link, h.Table, h.Data, h.Condition, h.Args...) } // Next calls the next hook handler. func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err error) { if h.handler != nil && !h.handlerCalled { h.handlerCalled = true - if gstr.HasPrefix(h.Condition, " WHERE ") { + if gstr.HasPrefix(h.Condition, whereKeyInCondition) { h.removedWhere = true - h.Condition = gstr.TrimLeftStr(h.Condition, " WHERE ") + h.Condition = gstr.TrimLeftStr(h.Condition, whereKeyInCondition) } return h.handler(ctx, h) } if h.removedWhere { - h.Condition = " WHERE " + h.Condition + h.Condition = whereKeyInCondition + h.Condition } - return h.db.DoDelete(ctx, h.link, h.Table, h.Condition, h.Args...) + return h.model.db.DoDelete(ctx, h.link, h.Table, h.Condition, h.Args...) } // Hook sets the hook functions for current model. diff --git a/database/gdb/gdb_model_insert.go b/database/gdb/gdb_model_insert.go index 077845b3e..ad2bfed60 100644 --- a/database/gdb/gdb_model_insert.go +++ b/database/gdb/gdb_model_insert.go @@ -314,8 +314,8 @@ func (m *Model) doInsertWithOption(insertOption int) (result sql.Result, err err in := &HookInsertInput{ internalParamHookInsert: internalParamHookInsert{ internalParamHook: internalParamHook{ - db: m.db, - link: m.getLink(true), + link: m.getLink(true), + model: m, }, handler: m.hookHandler.Insert, }, diff --git a/database/gdb/gdb_model_select.go b/database/gdb/gdb_model_select.go index e9e57c4ed..7d48cb28d 100644 --- a/database/gdb/gdb_model_select.go +++ b/database/gdb/gdb_model_select.go @@ -536,8 +536,8 @@ func (m *Model) doGetAllBySql(sql string, args ...interface{}) (result Result, e in := &HookSelectInput{ internalParamHookSelect: internalParamHookSelect{ internalParamHook: internalParamHook{ - db: m.db, - link: m.getLink(false), + link: m.getLink(false), + model: m, }, handler: m.hookHandler.Select, }, diff --git a/database/gdb/gdb_model_sharding.go b/database/gdb/gdb_model_sharding.go deleted file mode 100644 index 1e07971cc..000000000 --- a/database/gdb/gdb_model_sharding.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://github.com/gogf/gf. - -package gdb - -import ( - "context" - "reflect" - - "github.com/gogf/gf/v2/container/gvar" - "github.com/gogf/gf/v2/errors/gerror" -) - -// ShardingInput is input parameters for custom sharding handler. -type ShardingInput struct { - Table string // Current operation table name. - Schema string // Current operation schema, usually empty string which means uses default schema from configuration. - Data map[string]Value // Accurate key-value pairs from SELECT/INSERT/UPDATE/DELETE statement. -} - -// ShardingOutput is output parameters for custom sharding handler. -type ShardingOutput struct { - Table string // New table name for current operation. Use empty string for no changes of table name. - Schema string // New schema name for current operation. Use empty string for using default schema from configuration. -} - -type ShardingHandler func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error) - -type callShardingHandlerInput struct { - Table string - InsertData List - UpdateData interface{} - Condition string - Sql string -} - -func (m *Model) callShardingHandler(ctx context.Context, in callShardingHandlerInput) (out *ShardingOutput, err error) { - if m.shardingHandler == nil { - return &ShardingOutput{}, nil - } - return -} - -func (m *Model) shardingDataFromInsertData(data List) (shardingData map[string]Value, err error) { - if len(data) == 0 { - return nil, nil - } - shardingData = make(map[string]Value) - // If given batch data(in batch insert scenario), it uses the first data. - for k, v := range data[0] { - shardingData[k] = gvar.New(v) - } - return shardingData, nil -} - -func (m *Model) shardingDataFromUpdateData(data interface{}) (shardingData map[string]Value, err error) { - shardingData = make(map[string]Value) - switch value := data.(type) { - case map[string]interface{}: - for k, v := range value { - shardingData[k] = gvar.New(v) - } - case string: - - default: - return nil, gerror.Newf(`unsupported data of type "%s" for sharding`, reflect.TypeOf(data)) - } - return -} - -func (m *Model) shardingDataFromSql(sql string, args []interface{}) (shardingData map[string]Value, err error) { - return -} - -func (m *Model) shardingDataFromCondition(condition string) (shardingData map[string]Value, err error) { - return -} diff --git a/database/gdb/gdb_model_update.go b/database/gdb/gdb_model_update.go index 5f9c6cb2d..f6a39a09f 100644 --- a/database/gdb/gdb_model_update.go +++ b/database/gdb/gdb_model_update.go @@ -79,8 +79,8 @@ func (m *Model) Update(dataAndWhere ...interface{}) (result sql.Result, err erro in := &HookUpdateInput{ internalParamHookUpdate: internalParamHookUpdate{ internalParamHook: internalParamHook{ - db: m.db, - link: m.getLink(true), + link: m.getLink(true), + model: m, }, handler: m.hookHandler.Update, }, diff --git a/database/gdb/gdb_model_with.go b/database/gdb/gdb_model_with.go index 3e953a034..b6fa46452 100644 --- a/database/gdb/gdb_model_with.go +++ b/database/gdb/gdb_model_with.go @@ -143,7 +143,7 @@ func (m *Model) doWithScanStruct(pointer interface{}) error { } // Recursively with feature checks. - model = m.db.With(field.Value).Hook(m.hook) + model = m.db.With(field.Value).Hook(m.hookHandler) if m.withAll { model = model.WithAll() } else { @@ -258,7 +258,7 @@ func (m *Model) doWithScanStructs(pointer interface{}) error { fieldKeys = structType.FieldKeys() } // Recursively with feature checks. - model = m.db.With(field.Value).Hook(m.hook) + model = m.db.With(field.Value).Hook(m.hookHandler) if m.withAll { model = model.WithAll() } else { diff --git a/database/gdb/gdb_z_mysql_feature_sharding_test.go b/database/gdb/gdb_z_mysql_feature_sharding_test.go new file mode 100644 index 000000000..aed2b1823 --- /dev/null +++ b/database/gdb/gdb_z_mysql_feature_sharding_test.go @@ -0,0 +1,138 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gdb_test + +import ( + "context" + "testing" + + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/test/gtest" +) + +func Test_Model_Sharding(t *testing.T) { + table1 := createTable() + table2 := createTable() + defer dropTable(table1) + defer dropTable(table2) + + gtest.C(t, func(t *gtest.T) { + _, err1 := db.Model(table1).Data(g.Map{ + "id": 1, + }).Insert() + t.AssertNil(err1) + _, err2 := db.Model(table2).Data(g.Map{ + "id": 2, + }).Insert() + t.AssertNil(err2) + }) + // no sharding. + gtest.C(t, func(t *gtest.T) { + all, err := db.Model(table1).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 1) + }) + // with sharding handler. + gtest.C(t, func(t *gtest.T) { + all, err := db.Model(table1).Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table2, + } + return + }).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 2) + }) + // with sharding handler and no existence table name. + gtest.C(t, func(t *gtest.T) { + all, err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table2, + } + return + }).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 2) + }) + // with sharding handler and no existence table name and tables fields retrieving. + gtest.C(t, func(t *gtest.T) { + type User struct { + Id int + Passport string + Password string + NickName string + } + var users []User + err := db.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table2, + } + return + }).Scan(&users) + t.AssertNil(err) + t.Assert(len(users), 1) + t.Assert(users[0].Id, 2) + }) +} + +func Test_Model_Sharding_Schema(t *testing.T) { + var ( + db1 = db + db2 = db.Schema(TestSchema2) + table1 = createTableWithDb(db1) + table2 = createTableWithDb(db2) + ) + + defer dropTableWithDb(db1, table1) + defer dropTableWithDb(db2, table2) + + gtest.C(t, func(t *gtest.T) { + _, err1 := db1.Model(table1).Data(g.Map{ + "id": 1, + }).Insert() + t.AssertNil(err1) + _, err2 := db2.Model(table2).Data(g.Map{ + "id": 2, + }).Insert() + t.AssertNil(err2) + }) + // no sharding. + gtest.C(t, func(t *gtest.T) { + all, err := db1.Model(table1).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 1) + }) + gtest.C(t, func(t *gtest.T) { + _, err := db1.Model(table2).All() + // Table not exist error. + t.AssertNE(err, nil) + }) + gtest.C(t, func(t *gtest.T) { + all, err := db2.Model(table2).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 2) + }) + // with sharding handler and no existence table name and schema change. + gtest.C(t, func(t *gtest.T) { + all, err := db1.Model("none").Sharding(func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table2, + Schema: TestSchema2, + } + return + }).All() + t.AssertNil(err) + t.Assert(len(all), 1) + t.Assert(all[0]["id"].String(), 2) + }) +} diff --git a/go.mod b/go.mod index 9bde62df0..383d8496e 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/gorilla/websocket v1.5.0 github.com/grokify/html-strip-tags-go v0.0.1 + github.com/longbridgeapp/sqlparser v0.3.1 github.com/olekukonko/tablewriter v0.0.5 go.opentelemetry.io/otel v1.0.0 go.opentelemetry.io/otel/sdk v1.0.0 diff --git a/go.sum b/go.sum index 01759c8a5..afcaa4dd5 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= +github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -41,6 +43,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0= github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/longbridgeapp/sqlparser v0.3.1 h1:iWOZWGIFgQrJRgobLXUNJdvqGRpbVXkyKUKUA5CNJBE= +github.com/longbridgeapp/sqlparser v0.3.1/go.mod h1:GIHaUq8zvYyHLCLMJJykx1CdM6LHtkUih/QaJXySSx4= github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=