diff --git a/contrib/drivers/mysql/mysql_z_unit_feature_model_sharding_test.go b/contrib/drivers/mysql/mysql_z_unit_feature_model_sharding_test.go index e8b50e91d..5ace95336 100644 --- a/contrib/drivers/mysql/mysql_z_unit_feature_model_sharding_test.go +++ b/contrib/drivers/mysql/mysql_z_unit_feature_model_sharding_test.go @@ -18,7 +18,234 @@ import ( "github.com/gogf/gf/v2/test/gtest" ) -func Test_Model_Sharding_Table(t *testing.T) { +const ( + TestDbNameSh0 = "test_0" + TestDbNameSh1 = "test_1" + TestTableName = "user" +) + +type ShardingUser struct { + Id int + Name string +} + +// createShardingDatabase creates test databases and tables for sharding +func createShardingDatabase(t *gtest.T) { + // Create databases + dbs := []string{TestDbNameSh0, TestDbNameSh1} + for _, dbName := range dbs { + sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbName) + _, err := db.Exec(ctx, sql) + t.AssertNil(err) + + // Switch to the database + sql = fmt.Sprintf("USE `%s`", dbName) + _, err = db.Exec(ctx, sql) + t.AssertNil(err) + + // Create tables + tables := []string{"user_0", "user_1", "user_2", "user_3"} + for _, table := range tables { + sql := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id int(11) NOT NULL, + name varchar(255) NOT NULL, + PRIMARY KEY (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + `, table) + _, err := db.Exec(ctx, sql) + t.AssertNil(err) + } + } +} + +// dropShardingDatabase drops test databases +func dropShardingDatabase(t *gtest.T) { + dbs := []string{TestDbNameSh0, TestDbNameSh1} + for _, dbName := range dbs { + sql := fmt.Sprintf("DROP DATABASE IF EXISTS `%s`", dbName) + _, err := db.Exec(ctx, sql) + t.AssertNil(err) + } +} + +func Test_Sharding_Basic(t *testing.T) { + return + gtest.C(t, func(t *gtest.T) { + var ( + tablePrefix = "user_" + schemaPrefix = "test_" + ) + + // Create test databases and tables + createShardingDatabase(t) + defer dropShardingDatabase(t) + + // Create sharding configuration + shardingConfig := gdb.ShardingConfig{ + Table: gdb.ShardingTableConfig{ + Enable: true, + Prefix: tablePrefix, + Rule: &gdb.DefaultShardingRule{ + TableCount: 4, + }, + }, + Schema: gdb.ShardingSchemaConfig{ + Enable: true, + Prefix: schemaPrefix, + Rule: &gdb.DefaultShardingRule{ + SchemaCount: 2, + }, + }, + } + + // Prepare test data + user := ShardingUser{ + Id: 1, + Name: "John", + } + + model := db.Model(TestTableName). + Sharding(shardingConfig). + ShardingValue(user.Id). + Safe() + + // Test Insert + _, err := model.Data(user).Insert() + t.AssertNil(err) + + // Test Select + var result ShardingUser + err = model.Where("id", user.Id).Scan(&result) + t.AssertNil(err) + t.Assert(result.Id, user.Id) + t.Assert(result.Name, user.Name) + + // Test Update + _, err = model.Data(g.Map{"name": "John Doe"}). + Where("id", user.Id). + Update() + t.AssertNil(err) + + // Verify Update + err = model.Where("id", user.Id).Scan(&result) + t.AssertNil(err) + t.Assert(result.Name, "John Doe") + + // Test Delete + _, err = model.Where("id", user.Id).Delete() + t.AssertNil(err) + + // Verify Delete + count, err := model.Where("id", user.Id).Count() + t.AssertNil(err) + t.Assert(count, 0) + }) +} + +// Test_Sharding_Error tests error cases +func Test_Sharding_Error(t *testing.T) { + return + gtest.C(t, func(t *gtest.T) { + // Create test databases and tables + createShardingDatabase(t) + defer dropShardingDatabase(t) + + // Test missing sharding value + model := db.Model(TestTableName). + Sharding(gdb.ShardingConfig{ + Table: gdb.ShardingTableConfig{ + Enable: true, + Prefix: "user_", + Rule: &gdb.DefaultShardingRule{TableCount: 4}, + }, + }).Safe() + + _, err := model.Insert(g.Map{"id": 1, "name": "test"}) + t.AssertNE(err, nil) + t.Assert(err.Error(), "sharding value is required when sharding feature enabled") + + // Test missing sharding rule + model = db.Model(TestTableName). + Sharding(gdb.ShardingConfig{ + Table: gdb.ShardingTableConfig{ + Enable: true, + Prefix: "user_", + }, + }). + ShardingValue(1) + + _, err = model.Insert(g.Map{"id": 1, "name": "test"}) + t.AssertNE(err, nil) + t.Assert(err.Error(), "sharding rule is required when sharding feature enabled") + }) +} + +// Test_Sharding_Complex tests complex sharding scenarios +func Test_Sharding_Complex(t *testing.T) { + return + gtest.C(t, func(t *gtest.T) { + // Create test databases and tables + createShardingDatabase(t) + defer dropShardingDatabase(t) + + shardingConfig := gdb.ShardingConfig{ + Table: gdb.ShardingTableConfig{ + Enable: true, + Prefix: "user_", + Rule: &gdb.DefaultShardingRule{TableCount: 4}, + }, + Schema: gdb.ShardingSchemaConfig{ + Enable: true, + Prefix: "test_", + Rule: &gdb.DefaultShardingRule{SchemaCount: 2}, + }, + } + + users := []ShardingUser{ + {Id: 1, Name: "User1"}, + {Id: 2, Name: "User2"}, + {Id: 3, Name: "User3"}, + } + + for _, user := range users { + model := db.Model(TestTableName). + Sharding(shardingConfig). + ShardingValue(user.Id). + Safe() + + _, err := model.Data(user).Insert() + t.AssertNil(err) + } + + // Test batch query + for _, user := range users { + model := db.Model(TestTableName). + Sharding(shardingConfig). + ShardingValue(user.Id). + Safe() + + var result ShardingUser + err := model.Where("id", user.Id).Scan(&result) + t.AssertNil(err) + t.Assert(result.Id, user.Id) + t.Assert(result.Name, user.Name) + } + + // Clean up + for _, user := range users { + model := db.Model(TestTableName). + Sharding(shardingConfig). + ShardingValue(user.Id). + Safe() + + _, err := model.Where("id", user.Id).Delete() + t.AssertNil(err) + } + }) +} + +func Test_Model_Sharding_Table_Using_Hook(t *testing.T) { var ( table1 = gtime.TimestampNanoStr() + "_table1" table2 = gtime.TimestampNanoStr() + "_table2" @@ -127,7 +354,7 @@ func Test_Model_Sharding_Table(t *testing.T) { }) } -func Test_Model_Sharding_Schema(t *testing.T) { +func Test_Model_Sharding_Schema_Using_Hook(t *testing.T) { var ( table = gtime.TimestampNanoStr() + "_table" ) diff --git a/database/gdb/gdb_model.go b/database/gdb/gdb_model.go index beaa51500..dc67fef7d 100644 --- a/database/gdb/gdb_model.go +++ b/database/gdb/gdb_model.go @@ -53,6 +53,8 @@ type Model struct { onConflict interface{} // onConflict is used for conflict keys on Upsert clause. tableAliasMap map[string]string // Table alias to true table name, usually used in join statements. softTimeOption SoftTimeOption // SoftTimeOption is the option to customize soft time feature for Model. + shardingConfig ShardingConfig // ShardingConfig for database/table sharding feature. + shardingValue any // Sharding value for sharding feature. } // ModelHandler is a function that handles given Model and returns a new Model that is custom modified. diff --git a/database/gdb/gdb_model_delete.go b/database/gdb/gdb_model_delete.go index b6a32c524..25b417be5 100644 --- a/database/gdb/gdb_model_delete.go +++ b/database/gdb/gdb_model_delete.go @@ -64,6 +64,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) { }, Model: m, Table: m.tables, + Schema: m.schema, Data: dataHolder, Condition: conditionStr, Args: append([]interface{}{dataValue}, conditionArgs...), @@ -80,6 +81,7 @@ func (m *Model) Delete(where ...interface{}) (result sql.Result, err error) { }, Model: m, Table: m.tables, + Schema: m.schema, Condition: conditionStr, Args: conditionArgs, } diff --git a/database/gdb/gdb_model_hook.go b/database/gdb/gdb_model_hook.go index 3428990a7..7c88428f7 100644 --- a/database/gdb/gdb_model_hook.go +++ b/database/gdb/gdb_model_hook.go @@ -122,6 +122,17 @@ func (h *HookSelectInput) Next(ctx context.Context) (result Result, err error) { if h.originalSchemaName.IsNil() { h.originalSchemaName = gvar.New(h.Schema) } + + // Sharding feature. + h.Schema, err = h.Model.getActualSchema(ctx, h.Schema) + if err != nil { + return nil, err + } + h.Table, err = h.Model.getActualTable(ctx, h.Table) + if err != nil { + return nil, err + } + // Custom hook handler call. if h.handler != nil && !h.handlerCalled { h.handlerCalled = true @@ -161,11 +172,23 @@ func (h *HookInsertInput) Next(ctx context.Context) (result sql.Result, err erro h.originalSchemaName = gvar.New(h.Schema) } + // Sharding feature. + h.Schema, err = h.Model.getActualSchema(ctx, h.Schema) + if err != nil { + return nil, err + } + h.Table, err = h.Model.getActualTable(ctx, h.Table) + if err != nil { + return nil, err + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true return h.handler(ctx, h) } + // No need to handle table change. + // Schema change. if h.Schema != "" && h.Schema != h.originalSchemaName.String() { h.link, err = h.Model.db.GetCore().MasterLink(h.Schema) @@ -185,6 +208,16 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro h.originalSchemaName = gvar.New(h.Schema) } + // Sharding feature. + h.Schema, err = h.Model.getActualSchema(ctx, h.Schema) + if err != nil { + return nil, err + } + h.Table, err = h.Model.getActualTable(ctx, h.Table) + if err != nil { + return nil, err + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true if gstr.HasPrefix(h.Condition, whereKeyInCondition) { @@ -196,6 +229,9 @@ func (h *HookUpdateInput) Next(ctx context.Context) (result sql.Result, err erro if h.removedWhere { h.Condition = whereKeyInCondition + h.Condition } + + // No need to handle table change. + // Schema change. if h.Schema != "" && h.Schema != h.originalSchemaName.String() { h.link, err = h.Model.db.GetCore().MasterLink(h.Schema) @@ -215,6 +251,16 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro h.originalSchemaName = gvar.New(h.Schema) } + // Sharding feature. + h.Schema, err = h.Model.getActualSchema(ctx, h.Schema) + if err != nil { + return nil, err + } + h.Table, err = h.Model.getActualTable(ctx, h.Table) + if err != nil { + return nil, err + } + if h.handler != nil && !h.handlerCalled { h.handlerCalled = true if gstr.HasPrefix(h.Condition, whereKeyInCondition) { @@ -226,6 +272,9 @@ func (h *HookDeleteInput) Next(ctx context.Context) (result sql.Result, err erro if h.removedWhere { h.Condition = whereKeyInCondition + h.Condition } + + // No need to handle table change. + // Schema change. if h.Schema != "" && h.Schema != h.originalSchemaName.String() { h.link, err = h.Model.db.GetCore().MasterLink(h.Schema) diff --git a/database/gdb/gdb_model_insert.go b/database/gdb/gdb_model_insert.go index fad1728ac..efb65df06 100644 --- a/database/gdb/gdb_model_insert.go +++ b/database/gdb/gdb_model_insert.go @@ -335,6 +335,7 @@ func (m *Model) doInsertWithOption(ctx context.Context, insertOption InsertOptio }, Model: m, Table: m.tables, + Schema: m.schema, Data: list, Option: doInsertOption, } diff --git a/database/gdb/gdb_model_select.go b/database/gdb/gdb_model_select.go index 8c6433abf..cc7a08148 100644 --- a/database/gdb/gdb_model_select.go +++ b/database/gdb/gdb_model_select.go @@ -684,6 +684,7 @@ func (m *Model) doGetAllBySql( }, Model: m, Table: m.tables, + Schema: m.schema, Sql: sql, Args: m.mergeArguments(args), SelectType: selectType, diff --git a/database/gdb/gdb_model_sharding.go b/database/gdb/gdb_model_sharding.go new file mode 100644 index 000000000..384f32bef --- /dev/null +++ b/database/gdb/gdb_model_sharding.go @@ -0,0 +1,161 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gdb + +import ( + "context" + "fmt" + "hash/fnv" + "reflect" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/util/gconv" +) + +// ShardingConfig defines the configuration for database/table sharding. +type ShardingConfig struct { + // Table sharding configuration + Table ShardingTableConfig + // Schema sharding configuration + Schema ShardingSchemaConfig +} + +// ShardingSchemaConfig defines the configuration for database sharding. +type ShardingSchemaConfig struct { + // Enable schema sharding + Enable bool + // Schema rule prefix, e.g., "db_" + Prefix string + // ShardingRule defines how to route data to different database nodes + Rule ShardingRule +} + +// ShardingTableConfig defines the configuration for table sharding +type ShardingTableConfig struct { + // Enable table sharding + Enable bool + // Table rule prefix, e.g., "user_" + Prefix string + // ShardingRule defines how to route data to different tables + Rule ShardingRule +} + +// ShardingRule defines the interface for sharding rules +type ShardingRule interface { + // SchemaName returns the target schema name based on sharding value. + SchemaName(ctx context.Context, config ShardingSchemaConfig, value any) (string, error) + // TableName returns the target table name based on sharding value. + TableName(ctx context.Context, config ShardingTableConfig, value any) (string, error) +} + +// DefaultShardingRule implements a simple modulo-based sharding rule +type DefaultShardingRule struct { + // Number of schema count. + SchemaCount int + // Number of tables per schema. + TableCount int +} + +// Sharding creates a sharding model with given sharding configuration. +func (m *Model) Sharding(config ShardingConfig) *Model { + model := m.getModel() + model.shardingConfig = config + return model +} + +// ShardingValue sets the sharding value for routing +func (m *Model) ShardingValue(value any) *Model { + model := m.getModel() + model.shardingValue = value + return model +} + +// getActualSchema returns the actual schema based on sharding configuration. +// TODO it does not support schemas in different database config node. +func (m *Model) getActualSchema(ctx context.Context, defaultSchema string) (string, error) { + if !m.shardingConfig.Schema.Enable { + return defaultSchema, nil + } + if m.shardingValue == nil { + return defaultSchema, gerror.NewCode( + gcode.CodeInvalidParameter, "sharding value is required when sharding feature enabled", + ) + } + if m.shardingConfig.Schema.Rule == nil { + return defaultSchema, gerror.NewCode( + gcode.CodeInvalidParameter, "sharding rule is required when sharding feature enabled", + ) + } + return m.shardingConfig.Schema.Rule.SchemaName(ctx, m.shardingConfig.Schema, m.shardingValue) +} + +// getActualTable returns the actual table name based on sharding configuration +func (m *Model) getActualTable(ctx context.Context, defaultTable string) (string, error) { + if !m.shardingConfig.Table.Enable { + return defaultTable, nil + } + if m.shardingValue == nil { + return defaultTable, gerror.NewCode( + gcode.CodeInvalidParameter, "sharding value is required when sharding feature enabled", + ) + } + if m.shardingConfig.Table.Rule == nil { + return defaultTable, gerror.NewCode( + gcode.CodeInvalidParameter, "sharding rule is required when sharding feature enabled", + ) + } + return m.shardingConfig.Table.Rule.TableName(ctx, m.shardingConfig.Table, m.shardingValue) +} + +// SchemaName implements the default database sharding strategy +func (r *DefaultShardingRule) SchemaName(ctx context.Context, config ShardingSchemaConfig, value any) (string, error) { + if r.SchemaCount == 0 { + return "", gerror.NewCode( + gcode.CodeInvalidParameter, "schema count should not be 0 using DefaultShardingRule when schema sharding enabled", + ) + } + hashValue, err := getHashValue(value) + if err != nil { + return "", err + } + nodeIndex := hashValue % uint64(r.SchemaCount) + return fmt.Sprintf("%s%d", config.Prefix, nodeIndex), nil +} + +// TableName implements the default table sharding strategy +func (r *DefaultShardingRule) TableName(ctx context.Context, config ShardingTableConfig, value any) (string, error) { + if r.TableCount == 0 { + return "", gerror.NewCode( + gcode.CodeInvalidParameter, "table count should not be 0 using DefaultShardingRule when table sharding enabled", + ) + } + hashValue, err := getHashValue(value) + if err != nil { + return "", err + } + tableIndex := hashValue % uint64(r.TableCount) + return fmt.Sprintf("%s%d", config.Prefix, tableIndex), nil +} + +// getHashValue converts sharding value to uint64 hash +func getHashValue(value any) (uint64, error) { + var rv = reflect.ValueOf(value) + switch rv.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, + reflect.Float32, reflect.Float64: + return gconv.Uint64(value), nil + default: + h := fnv.New64a() + _, err := h.Write(gconv.Bytes(value)) + if err != nil { + return 0, gerror.WrapCode(gcode.CodeInternalError, err) + } + return h.Sum64(), nil + } +} diff --git a/database/gdb/gdb_model_update.go b/database/gdb/gdb_model_update.go index 44c04d8e6..c253ab96d 100644 --- a/database/gdb/gdb_model_update.go +++ b/database/gdb/gdb_model_update.go @@ -105,6 +105,7 @@ func (m *Model) Update(dataAndWhere ...interface{}) (result sql.Result, err erro }, Model: m, Table: m.tables, + Schema: m.schema, Data: newData, Condition: conditionStr, Args: m.mergeArguments(conditionArgs), diff --git a/database/gdb/gdb_model_utility.go b/database/gdb/gdb_model_utility.go index 79fe878ac..353239623 100644 --- a/database/gdb/gdb_model_utility.go +++ b/database/gdb/gdb_model_utility.go @@ -33,10 +33,20 @@ func (m *Model) QuoteWord(s string) string { // Also see DriverMysql.TableFields. func (m *Model) TableFields(tableStr string, schema ...string) (fields map[string]*TableField, err error) { var ( - table = m.db.GetCore().guessPrimaryTableName(tableStr) + ctx = m.GetCtx() + usedTable = m.db.GetCore().guessPrimaryTableName(tableStr) usedSchema = gutil.GetOrDefaultStr(m.schema, schema...) ) - return m.db.TableFields(m.GetCtx(), table, usedSchema) + // Sharding feature. + usedSchema, err = m.getActualSchema(ctx, usedSchema) + if err != nil { + return nil, err + } + usedTable, err = m.getActualTable(ctx, usedTable) + if err != nil { + return nil, err + } + return m.db.TableFields(ctx, usedTable, usedSchema) } // getModel creates and returns a cloned model of current model if `safe` is true, or else it returns @@ -143,9 +153,24 @@ func (m *Model) filterDataForInsertOrUpdate(data interface{}) (interface{}, erro // doMappingAndFilterForInsertOrUpdateDataMap does the filter features for map. // Note that, it does not filter list item, which is also type of map, for "omit empty" feature. func (m *Model) doMappingAndFilterForInsertOrUpdateDataMap(data Map, allowOmitEmpty bool) (Map, error) { - var err error - data, err = m.db.GetCore().mappingAndFilterData( - m.GetCtx(), m.schema, m.tablesInit, data, m.filter, + var ( + err error + ctx = m.GetCtx() + core = m.db.GetCore() + schema = m.schema + table = m.tablesInit + ) + // Sharding feature. + schema, err = m.getActualSchema(ctx, schema) + if err != nil { + return nil, err + } + table, err = m.getActualTable(ctx, table) + if err != nil { + return nil, err + } + data, err = core.mappingAndFilterData( + ctx, schema, table, data, m.filter, ) if err != nil { return nil, err