diff --git a/contrib/drivers/mysql/mysql__test.go b/contrib/drivers/mysql/mysql__test.go index 13bc81a05..97f428c4d 100644 --- a/contrib/drivers/mysql/mysql__test.go +++ b/contrib/drivers/mysql/mysql__test.go @@ -30,6 +30,7 @@ const ( var ( db gdb.DB + db2 gdb.DB dbPrefix gdb.DB dbInvalid gdb.DB ctx = context.TODO() @@ -68,6 +69,7 @@ func init() { gtest.Error(err) } db = db.Schema(TestSchema1) + db2 = db.Schema(TestSchema2) // Prefix db. if r, err := gdb.NewByGroup("prefix"); err != nil { diff --git a/contrib/drivers/mysql/mysql_feature_model_sharding_test.go b/contrib/drivers/mysql/mysql_feature_model_sharding_test.go new file mode 100644 index 000000000..9e331f3cc --- /dev/null +++ b/contrib/drivers/mysql/mysql_feature_model_sharding_test.go @@ -0,0 +1,217 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package mysql_test + +import ( + "context" + "fmt" + "testing" + + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/test/gtest" +) + +func Test_Model_Sharding_Table(t *testing.T) { + var ( + table1 = gtime.TimestampNanoStr() + "_table1" + table2 = gtime.TimestampNanoStr() + "_table2" + ) + createTable(table1) + defer dropTable(table1) + createTable(table2) + defer dropTable(table2) + + shardingModel := db.Model(table1).Sharding( + func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table2, + Schema: in.Schema, + } + return + }, + ) + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Insert(g.Map{ + "id": 1, + "passport": fmt.Sprintf(`user_%d`, 1), + "password": fmt.Sprintf(`pass_%d`, 1), + "nickname": fmt.Sprintf(`name_%d`, 1), + "create_time": gtime.NewFromStr(CreateTime).String(), + }) + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var count int + count, err = shardingModel.Count() + t.AssertNil(err) + t.Assert(count, 1) + + count, err = db.Model(table1).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db.Model(table2).Count() + t.AssertNil(err) + t.Assert(count, 1) + }) + + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Where(g.Map{ + "id": 1, + }).Data(g.Map{ + "passport": fmt.Sprintf(`user_%d`, 2), + "password": fmt.Sprintf(`pass_%d`, 2), + "nickname": fmt.Sprintf(`name_%d`, 2), + }).Update() + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var ( + count int + where = g.Map{"passport": fmt.Sprintf(`user_%d`, 2)} + ) + count, err = shardingModel.Where(where).Count() + t.AssertNil(err) + t.Assert(count, 1) + + count, err = db.Model(table1).Where(where).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db.Model(table2).Where(where).Count() + t.AssertNil(err) + t.Assert(count, 1) + }) + + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Where(g.Map{ + "id": 1, + }).Delete() + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var count int + count, err = shardingModel.Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db.Model(table1).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db.Model(table2).Count() + t.AssertNil(err) + t.Assert(count, 0) + }) +} + +func Test_Model_Sharding_Schema(t *testing.T) { + var ( + table = gtime.TimestampNanoStr() + "_table" + ) + createTableWithDb(db, table) + defer dropTableWithDb(db, table) + createTableWithDb(db2, table) + defer dropTableWithDb(db2, table) + + shardingModel := db.Model(table).Sharding( + func(ctx context.Context, in gdb.ShardingInput) (out *gdb.ShardingOutput, err error) { + out = &gdb.ShardingOutput{ + Table: table, + Schema: db2.GetSchema(), + } + return + }, + ) + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Insert(g.Map{ + "id": 1, + "passport": fmt.Sprintf(`user_%d`, 1), + "password": fmt.Sprintf(`pass_%d`, 1), + "nickname": fmt.Sprintf(`name_%d`, 1), + "create_time": gtime.NewFromStr(CreateTime).String(), + }) + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var count int + count, err = shardingModel.Count() + t.AssertNil(err) + t.Assert(count, 1) + + count, err = db.Model(table).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db2.Model(table).Count() + t.AssertNil(err) + t.Assert(count, 1) + }) + + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Where(g.Map{ + "id": 1, + }).Data(g.Map{ + "passport": fmt.Sprintf(`user_%d`, 2), + "password": fmt.Sprintf(`pass_%d`, 2), + "nickname": fmt.Sprintf(`name_%d`, 2), + }).Update() + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var ( + count int + where = g.Map{"passport": fmt.Sprintf(`user_%d`, 2)} + ) + count, err = shardingModel.Where(where).Count() + t.AssertNil(err) + t.Assert(count, 1) + + count, err = db.Model(table).Where(where).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db2.Model(table).Where(where).Count() + t.AssertNil(err) + t.Assert(count, 1) + }) + + gtest.C(t, func(t *gtest.T) { + r, err := shardingModel.Where(g.Map{ + "id": 1, + }).Delete() + t.AssertNil(err) + n, err := r.RowsAffected() + t.AssertNil(err) + t.Assert(n, 1) + + var count int + count, err = shardingModel.Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db.Model(table).Count() + t.AssertNil(err) + t.Assert(count, 0) + + count, err = db2.Model(table).Count() + t.AssertNil(err) + t.Assert(count, 0) + }) +} diff --git a/database/gdb/gdb.go b/database/gdb/gdb.go index 5b056b293..1e0c1edad 100644 --- a/database/gdb/gdb.go +++ b/database/gdb/gdb.go @@ -318,7 +318,7 @@ type Sql struct { type DoInsertOption struct { OnDuplicateStr string // Custom string for `on duplicated` statement. OnDuplicateMap map[string]interface{} // Custom key-value map from `OnDuplicateEx` function for `on duplicated` statement. - InsertOption int // Insert operation in constant value. + InsertOption InsertOption // Insert operation in constant value. BatchCount int // Batch count for batch inserting. } @@ -384,11 +384,13 @@ const ( linkPattern = `(\w+):([\w\-]*):(.*?)@(\w+?)\((.+?)\)/{0,1}([^\?]*)\?{0,1}(.*)` ) +type InsertOption int + const ( - InsertOptionDefault = 0 - InsertOptionReplace = 1 - InsertOptionSave = 2 - InsertOptionIgnore = 3 + InsertOptionDefault InsertOption = 0 + InsertOptionReplace InsertOption = 1 + InsertOptionSave InsertOption = 2 + InsertOptionIgnore InsertOption = 3 ) const ( diff --git a/database/gdb/gdb_func.go b/database/gdb/gdb_func.go index a59bf878c..90d02470e 100644 --- a/database/gdb/gdb_func.go +++ b/database/gdb/gdb_func.go @@ -196,7 +196,7 @@ func ListItemValuesUnique(list interface{}, key string, subKey ...interface{}) [ } // GetInsertOperationByOption returns proper insert option with given parameter `option`. -func GetInsertOperationByOption(option int) string { +func GetInsertOperationByOption(option InsertOption) string { var operator string switch option { case InsertOptionReplace: diff --git a/database/gdb/gdb_model.go b/database/gdb/gdb_model.go index a4a629049..5d19c0a82 100644 --- a/database/gdb/gdb_model.go +++ b/database/gdb/gdb_model.go @@ -49,6 +49,7 @@ type Model struct { safe bool // If true, it clones and returns a new model object whenever operation done; or else it changes the attribute of current model. onDuplicate interface{} // onDuplicate is used for ON "DUPLICATE KEY UPDATE" statement. onDuplicateEx interface{} // onDuplicateEx is used for excluding some columns ON "DUPLICATE KEY UPDATE" statement. + shardingFunc ShardingFunc // shardingFunc is the custom function for records sharding. } // ModelHandler is a function that handles given Model and returns a new Model that is custom modified. diff --git a/database/gdb/gdb_model_hook.go b/database/gdb/gdb_model_hook.go index e1325c544..a2d45d871 100644 --- a/database/gdb/gdb_model_hook.go +++ b/database/gdb/gdb_model_hook.go @@ -9,7 +9,9 @@ package gdb import ( "context" "database/sql" + "fmt" + "github.com/gogf/gf/v2/text/gregex" "github.com/gogf/gf/v2/text/gstr" ) @@ -31,9 +33,11 @@ type HookHandler struct { // internalParamHook manages all internal parameters for hook operations. // The `internal` obviously means you cannot access these parameters outside this package. type internalParamHook struct { - link Link // Connection object from third party sql driver. - handlerCalled bool // Simple mark for custom handler called, in case of recursive calling. - removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix. + link Link // Connection object from third party sql driver. + handlerCalled bool // Simple mark for custom handler called, in case of recursive calling. + removedWhere bool // Removed mark for condition string that was removed `WHERE` prefix. + originalTableName string // The original table name. + originalSchemaName string // The original schema name. } type internalParamHookSelect struct { @@ -61,38 +65,38 @@ type internalParamHookDelete struct { // which is usually not be interesting for upper business hook handler. type HookSelectInput struct { internalParamHookSelect - Model *Model - Table string - Sql string - Args []interface{} + Model *Model // Current operation Model, which takes no effect if updated. + Table string // The table name that to be used. Update this attribute to change target table name. + Sql string // The sql string that to be committed. + Args []interface{} // The arguments of sql. } // HookInsertInput holds the parameters for insert hook operation. type HookInsertInput struct { internalParamHookInsert - Model *Model - Table string - Data List - Option DoInsertOption + Model *Model // Current operation Model, which takes no effect if updated. + Table string // The table name that to be used. Update this attribute to change target table name. + Data List // The data records list to be inserted/saved into table. + Option DoInsertOption // The extra option for data inserting. } // HookUpdateInput holds the parameters for update hook operation. type HookUpdateInput struct { internalParamHookUpdate - Model *Model - Table string - Data interface{} // Data can be type of: map[string]interface{}/string. You can use type assertion on `Data`. - Condition string - Args []interface{} + Model *Model // Current operation Model, which takes no effect if updated. + Table string // The table name that to be used. Update this attribute to change target table name. + Data interface{} // Data can be type of: map[string]interface{}/string. You can use type assertion on `Data`. + Condition string // The where condition string for updating. + Args []interface{} // The arguments for sql place-holders. } // HookDeleteInput holds the parameters for delete hook operation. type HookDeleteInput struct { internalParamHookDelete - Model *Model - Table string - Condition string - Args []interface{} + Model *Model // Current operation Model, which takes no effect if updated. + Table string // The table name that to be used. Update this attribute to change target table name. + Condition string // The where condition string for deleting. + Args []interface{} // The arguments for sql place-holders. } const ( @@ -104,17 +108,83 @@ func (h *internalParamHook) IsTransaction() bool { return h.link.IsTransaction() } +func (h *internalParamHook) handlerSharding( + ctx context.Context, table string, model *Model, isOnMaster bool, +) (newTable string, err error) { + shardingInput := ShardingInput{ + Table: table, + Schema: model.db.GetSchema(), + } + newTable = shardingInput.Table + h.originalTableName = shardingInput.Table + h.originalSchemaName = shardingInput.Schema + if model.shardingFunc != nil { + var shardingOutput *ShardingOutput + // Call custom sharding function. + shardingOutput, err = model.shardingFunc(ctx, shardingInput) + if err != nil { + return + } + if shardingOutput != nil { + // Table sharding. + if shardingOutput.Table != "" { + newTable = shardingOutput.Table + } + // Schema sharding. + if shardingOutput.Schema != "" && shardingOutput.Schema != shardingInput.Schema { + if isOnMaster { + // Insert/Update/Delete statements on master node. + h.link, err = model.db.GetCore().MasterLink(shardingOutput.Schema) + } else { + // Select statement on slave node. + h.link, err = model.db.GetCore().SlaveLink(shardingOutput.Schema) + } + if err != nil { + return + } + } + } + } + return +} + // Next calls the next hook handler. func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) { + // Sharding feature. + if h.originalTableName == "" { + if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, false); err != nil { + return + } + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true return h.handler(ctx, h) } - return h.Model.db.DoSelect(ctx, h.link, h.Sql, h.Args...) + var toBeCommittedSql = h.Sql + if h.Table != h.originalTableName { + // Replace table name the table name is changed by hook handler. + toBeCommittedSql, err = gregex.ReplaceStringFuncMatch( + `(?i) FROM ([\S]+)`, + toBeCommittedSql, + func(match []string) string { + charL, charR := h.Model.db.GetChars() + return fmt.Sprintf(` FROM %s%s%s`, charL, h.Table, charR) + }, + ) + } + return h.Model.db.DoSelect(ctx, h.link, toBeCommittedSql, h.Args...) } // Next calls the next hook handler. func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err error) { + // Sharding feature. + if h.originalTableName == "" { + if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil { + return + } + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true return h.handler(ctx, h) @@ -124,6 +194,13 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro // Next calls the next hook handler. func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err error) { + // Sharding feature. + if h.originalTableName == "" { + if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil { + return + } + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true if gstr.HasPrefix(h.Condition, whereKeyInCondition) { @@ -140,6 +217,13 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro // Next calls the next hook handler. func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err error) { + // Sharding feature. + if h.originalTableName == "" { + if h.Table, err = h.handlerSharding(ctx, h.Table, h.Model, true); err != nil { + return + } + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true if gstr.HasPrefix(h.Condition, whereKeyInCondition) { diff --git a/database/gdb/gdb_model_insert.go b/database/gdb/gdb_model_insert.go index 131e3b3d9..8318f08c8 100644 --- a/database/gdb/gdb_model_insert.go +++ b/database/gdb/gdb_model_insert.go @@ -249,7 +249,7 @@ func (m *Model) Save(data ...interface{}) (result sql.Result, err error) { } // doInsertWithOption inserts data with option parameter. -func (m *Model) doInsertWithOption(ctx context.Context, insertOption int) (result sql.Result, err error) { +func (m *Model) doInsertWithOption(ctx context.Context, insertOption InsertOption) (result sql.Result, err error) { defer func() { if err == nil { m.checkAndRemoveSelectCache(ctx) @@ -377,7 +377,7 @@ func (m *Model) doInsertWithOption(ctx context.Context, insertOption int) (resul return in.Next(ctx) } -func (m *Model) formatDoInsertOption(insertOption int, columnNames []string) (option DoInsertOption, err error) { +func (m *Model) formatDoInsertOption(insertOption InsertOption, columnNames []string) (option DoInsertOption, err error) { option = DoInsertOption{ InsertOption: insertOption, BatchCount: m.getBatch(), diff --git a/database/gdb/gdb_model_sharding.go b/database/gdb/gdb_model_sharding.go new file mode 100644 index 000000000..b7729353d --- /dev/null +++ b/database/gdb/gdb_model_sharding.go @@ -0,0 +1,35 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gdb + +import "context" + +// ShardingInput holds the input parameters for sharding. +type ShardingInput struct { + Table string // The original table name. + Schema string // The original schema name. Note that this might be empty according database configuration. +} + +// ShardingOutput holds the output parameters for sharding. +type ShardingOutput struct { + Table string // The target table name. + Schema string // The target schema name. +} + +// ShardingFunc is custom function for records sharding by certain Model, which supports sharding on table and schema. +// It retrieves the original Table/Schema from ShardingInput, and returns the new Table/Schema by ShardingOutput. +// If the Table/Schema in ShardingOutput is empty string, it then ignores the returned value and uses the default +// Table/Schema of current Model to execute the sql statement. +type ShardingFunc func(ctx context.Context, in ShardingInput) (out *ShardingOutput, err error) + +// Sharding sets custom sharding function for current model. +// More info please refer to ShardingFunc. +func (m *Model) Sharding(f ShardingFunc) *Model { + model := m.getModel() + model.shardingFunc = f + return model +} diff --git a/database/gdb/gdb_z_mysql_internal_test.go b/database/gdb/gdb_z_mysql_internal_test.go index b8e2a0575..d29589626 100644 --- a/database/gdb/gdb_z_mysql_internal_test.go +++ b/database/gdb/gdb_z_mysql_internal_test.go @@ -8,9 +8,11 @@ package gdb import ( "context" + "fmt" "testing" "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/text/gregex" ) var ( @@ -18,6 +20,40 @@ var ( ctx = context.TODO() ) +func Test_HookSelect_Regex(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + err error + toBeCommittedSql = `select * from "user" where 1=1` + ) + toBeCommittedSql, err = gregex.ReplaceStringFuncMatch( + `(?i) FROM ([\S]+)`, + toBeCommittedSql, + func(match []string) string { + + return fmt.Sprintf(` FROM "%s"`, "user_1") + }, + ) + t.AssertNil(err) + t.Assert(toBeCommittedSql, `select * FROM "user_1" where 1=1`) + }) + gtest.C(t, func(t *gtest.T) { + var ( + err error + toBeCommittedSql = `select * from user` + ) + toBeCommittedSql, err = gregex.ReplaceStringFuncMatch( + `(?i) FROM ([\S]+)`, + toBeCommittedSql, + func(match []string) string { + return fmt.Sprintf(` FROM %s`, "user_1") + }, + ) + t.AssertNil(err) + t.Assert(toBeCommittedSql, `select * FROM user_1`) + }) +} + func Test_parseConfigNodeLink_WithType(t *testing.T) { gtest.C(t, func(t *gtest.T) { node := &ConfigNode{