mirror of
https://gitee.com/johng/gf
synced 2026-06-06 02:25:47 +08:00
test(contrib/drivers/mysql): add concurrent/Hook/Ctx tests (#4708)
## Summary - Add concurrent operation tests - Add Hook mechanism tests (Insert/Update/Delete/Select) - Add Context propagation tests - Add race condition tests **Test coverage added:** ~28 test functions across 5 files Ref #4689 ## Test plan ```bash cd contrib/drivers/mysql go test -v -race -run "TestModel_Concurrent|TestModel_Hook|TestModel_Ctx" ```
This commit is contained in:
338
contrib/drivers/mysql/mysql_z_unit_feature_concurrent_test.go
Normal file
338
contrib/drivers/mysql/mysql_z_unit_feature_concurrent_test.go
Normal file
@ -0,0 +1,338 @@
|
||||
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
package mysql_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/gogf/gf/v2/database/gdb"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
)
|
||||
|
||||
// Test_Concurrent_Insert tests concurrent Insert operations
|
||||
func Test_Concurrent_Insert(t *testing.T) {
|
||||
table := createTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 10
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := db.Model(table).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("user_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
t.AssertNil(err)
|
||||
}(i + 1)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify all records inserted
|
||||
count, err := db.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, concurrency)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Update tests concurrent Update operations
|
||||
func Test_Concurrent_Update(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 5
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := db.Model(table).Data(g.Map{
|
||||
"nickname": fmt.Sprintf("updated_%d", id),
|
||||
}).Where("id", id+1).Update()
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify updates
|
||||
for i := 0; i < concurrency; i++ {
|
||||
one, err := db.Model(table).Where("id", i+1).One()
|
||||
t.AssertNil(err)
|
||||
t.Assert(one["nickname"].String(), fmt.Sprintf("updated_%d", i))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Delete tests concurrent Delete operations
|
||||
func Test_Concurrent_Delete(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 5
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := db.Model(table).Where("id", id+1).Delete()
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify deletions
|
||||
count, err := db.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, TableSize-concurrency)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Query tests concurrent Query operations
|
||||
func Test_Concurrent_Query(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 20
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
result, err := db.Model(table).Where("id", (id%TableSize)+1).One()
|
||||
t.AssertNil(err)
|
||||
t.AssertNE(result, nil)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Transaction tests concurrent transaction operations
|
||||
func Test_Concurrent_Transaction(t *testing.T) {
|
||||
table := createTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 10
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
err := db.Transaction(ctx, func(ctx g.Ctx, tx gdb.TX) error {
|
||||
_, err := tx.Model(table).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("user_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
return err
|
||||
})
|
||||
t.AssertNil(err)
|
||||
}(i + 1)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify all transactions committed
|
||||
count, err := db.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, concurrency)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Mixed_Operations tests mixed concurrent operations
|
||||
func Test_Concurrent_Mixed_Operations(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
operations := 30
|
||||
|
||||
wg.Add(operations)
|
||||
for i := 0; i < operations; i++ {
|
||||
op := i % 3
|
||||
switch op {
|
||||
case 0: // Insert
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, _ = db.Model(table).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("new_user_%d", id),
|
||||
"password": fmt.Sprintf("new_pass_%d", id),
|
||||
"nickname": fmt.Sprintf("new_name_%d", id),
|
||||
})
|
||||
}(i)
|
||||
case 1: // Update
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
targetId := (id % TableSize) + 1
|
||||
_, _ = db.Model(table).Data(g.Map{
|
||||
"nickname": fmt.Sprintf("concurrent_%d", id),
|
||||
}).Where("id", targetId).Update()
|
||||
}(i)
|
||||
case 2: // Query
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
targetId := (id % TableSize) + 1
|
||||
_, _ = db.Model(table).Where("id", targetId).One()
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify database is still consistent
|
||||
count, err := db.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.AssertGT(count, TableSize)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Connection_Pool tests connection pool under load
|
||||
func Test_Concurrent_Connection_Pool(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 50
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
// Each goroutine performs multiple operations
|
||||
for j := 0; j < 5; j++ {
|
||||
_, err := db.Model(table).Where("id", (id%TableSize)+1).One()
|
||||
t.AssertNil(err)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Schema_Switch tests concurrent schema switching
|
||||
func Test_Concurrent_Schema_Switch(t *testing.T) {
|
||||
table1 := createTableWithDb(db, "test_schema_1")
|
||||
table2 := createTableWithDb(db2, "test_schema_2")
|
||||
defer dropTableWithDb(db, table1)
|
||||
defer dropTableWithDb(db2, table2)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 10
|
||||
|
||||
wg.Add(concurrency * 2)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
// Insert to schema1
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := db.Model(table1).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("user_s1_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
|
||||
// Insert to schema2
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := db2.Model(table2).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("user_s2_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify both schemas
|
||||
count1, err := db.Model(table1).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count1, concurrency)
|
||||
|
||||
count2, err := db2.Model(table2).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count2, concurrency)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Model_Clone tests concurrent model cloning
|
||||
func Test_Concurrent_Model_Clone(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
baseModel := db.Model(table).Where("id>", 0)
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 20
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
// Clone model for each goroutine
|
||||
m := baseModel.Clone()
|
||||
result, err := m.Where("id<=", TableSize/2).All()
|
||||
t.AssertNil(err)
|
||||
t.AssertGT(len(result), 0)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Concurrent_Batch_Insert tests concurrent batch insert operations
|
||||
func Test_Concurrent_Batch_Insert(t *testing.T) {
|
||||
table := createTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 5
|
||||
batchSize := 10
|
||||
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(batchId int) {
|
||||
defer wg.Done()
|
||||
batch := make([]g.Map, 0, batchSize)
|
||||
for j := 0; j < batchSize; j++ {
|
||||
id := batchId*batchSize + j
|
||||
batch = append(batch, g.Map{
|
||||
"passport": fmt.Sprintf("batch_user_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
}
|
||||
_, err := db.Model(table).Data(batch).Insert()
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify all batch inserts
|
||||
count, err := db.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, concurrency*batchSize)
|
||||
})
|
||||
}
|
||||
@ -9,6 +9,7 @@ package mysql_test
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/database/gdb"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
@ -63,3 +64,100 @@ func Test_Ctx_Model(t *testing.T) {
|
||||
db.Model(table).All()
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Ctx_Timeout tests context timeout behavior
|
||||
func Test_Ctx_Timeout(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
// Create a context with very short timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
|
||||
defer cancel()
|
||||
|
||||
// Wait for timeout
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
// Query should fail due to context timeout
|
||||
_, err := db.Model(table).Ctx(ctx).All()
|
||||
t.AssertNE(err, nil)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Ctx_Cancel tests context cancellation
|
||||
func Test_Ctx_Cancel(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// Cancel immediately
|
||||
cancel()
|
||||
|
||||
// Query should fail due to cancelled context
|
||||
_, err := db.Model(table).Ctx(ctx).All()
|
||||
t.AssertNE(err, nil)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Ctx_Propagation_Transaction tests context propagation in transaction
|
||||
func Test_Ctx_Propagation_Transaction(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
db.GetLogger().(*glog.Logger).SetCtxKeys("TraceId")
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
db.SetDebug(true)
|
||||
defer db.SetDebug(false)
|
||||
|
||||
ctx := context.WithValue(context.Background(), "TraceId", "tx_trace_123")
|
||||
err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
||||
// Context should propagate to transaction operations
|
||||
_, err := tx.Model(table).Ctx(ctx).Where("id", 1).One()
|
||||
return err
|
||||
})
|
||||
t.AssertNil(err)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Ctx_Multiple_Values tests context with multiple values
|
||||
func Test_Ctx_Multiple_Values(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
db.GetLogger().(*glog.Logger).SetCtxKeys("TraceId", "RequestId", "UserId")
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
db.SetDebug(true)
|
||||
defer db.SetDebug(false)
|
||||
|
||||
ctx := context.WithValue(context.Background(), "TraceId", "trace_001")
|
||||
ctx = context.WithValue(ctx, "RequestId", "req_002")
|
||||
ctx = context.WithValue(ctx, "UserId", "user_003")
|
||||
|
||||
db.Model(table).Ctx(ctx).Where("id", 1).One()
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Ctx_Nested_Operations tests context in nested operations
|
||||
func Test_Ctx_Nested_Operations(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
db.GetLogger().(*glog.Logger).SetCtxKeys("TraceId")
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
db.SetDebug(true)
|
||||
defer db.SetDebug(false)
|
||||
|
||||
ctx := context.WithValue(context.Background(), "TraceId", "nested_trace")
|
||||
|
||||
// Nested query operations should all have context
|
||||
result, err := db.Model(table).Ctx(ctx).Where("id>", 0).All()
|
||||
t.AssertNil(err)
|
||||
|
||||
if len(result) > 0 {
|
||||
// Another query using same context
|
||||
_, err = db.Model(table).Ctx(ctx).Where("id", result[0]["id"]).One()
|
||||
t.AssertNil(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -134,3 +134,95 @@ func Test_Model_Hook_Delete(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Model_Hook_Multiple tests multiple hooks execution order
|
||||
func Test_Model_Hook_Multiple(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
var execOrder []string
|
||||
|
||||
m := db.Model(table).Hook(gdb.HookHandler{
|
||||
Select: func(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
|
||||
execOrder = append(execOrder, "hook1_before")
|
||||
result, err = in.Next(ctx)
|
||||
execOrder = append(execOrder, "hook1_after")
|
||||
return
|
||||
},
|
||||
}).Hook(gdb.HookHandler{
|
||||
Select: func(ctx context.Context, in *gdb.HookSelectInput) (result gdb.Result, err error) {
|
||||
execOrder = append(execOrder, "hook2_before")
|
||||
result, err = in.Next(ctx)
|
||||
execOrder = append(execOrder, "hook2_after")
|
||||
return
|
||||
},
|
||||
})
|
||||
|
||||
_, err := m.Where("id", 1).One()
|
||||
t.AssertNil(err)
|
||||
|
||||
// Verify hook execution order (FIFO - first registered hook executes first)
|
||||
t.AssertGT(len(execOrder), 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Model_Hook_Error_Abort tests hook returning error aborts operation
|
||||
func Test_Model_Hook_Error_Abort(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
m := db.Model(table).Hook(gdb.HookHandler{
|
||||
Insert: func(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result, err error) {
|
||||
// Return error to abort insert
|
||||
return nil, fmt.Errorf("hook aborted insert")
|
||||
},
|
||||
})
|
||||
|
||||
_, err := m.Insert(g.Map{
|
||||
"passport": "test_abort",
|
||||
"password": "pass",
|
||||
"nickname": "name",
|
||||
})
|
||||
t.AssertNE(err, nil)
|
||||
t.Assert(err.Error(), "hook aborted insert")
|
||||
|
||||
// Verify record was not inserted
|
||||
count, err := db.Model(table).Where("passport", "test_abort").Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Model_Hook_Modify_Data tests hook modifying data before insert
|
||||
func Test_Model_Hook_Modify_Data(t *testing.T) {
|
||||
table := createTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
m := db.Model(table).Hook(gdb.HookHandler{
|
||||
Insert: func(ctx context.Context, in *gdb.HookInsertInput) (result sql.Result, err error) {
|
||||
// Modify all data items
|
||||
for i := range in.Data {
|
||||
in.Data[i]["password"] = "encrypted_" + fmt.Sprint(in.Data[i]["password"])
|
||||
in.Data[i]["nickname"] = "verified_" + fmt.Sprint(in.Data[i]["nickname"])
|
||||
}
|
||||
return in.Next(ctx)
|
||||
},
|
||||
})
|
||||
|
||||
_, err := m.Insert(g.Map{
|
||||
"passport": "test_user",
|
||||
"password": "plain123",
|
||||
"nickname": "john",
|
||||
})
|
||||
t.AssertNil(err)
|
||||
|
||||
// Verify data was modified by hook
|
||||
one, err := db.Model(table).Where("passport", "test_user").One()
|
||||
t.AssertNil(err)
|
||||
t.Assert(one["password"].String(), "encrypted_plain123")
|
||||
t.Assert(one["nickname"].String(), "verified_john")
|
||||
})
|
||||
}
|
||||
|
||||
@ -7,7 +7,9 @@
|
||||
package mysql_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/gogf/gf/v2/container/garray"
|
||||
@ -98,3 +100,225 @@ func Test_Master_Slave(t *testing.T) {
|
||||
t.Assert(count, int64(TableSize))
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Master_Slave_Concurrent_ReadWrite tests concurrent read/write routing
|
||||
func Test_Master_Slave_Concurrent_ReadWrite(t *testing.T) {
|
||||
var err error
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `master` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `slave` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
})
|
||||
defer func() {
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `master`")
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `slave`")
|
||||
}()
|
||||
|
||||
var (
|
||||
configKey = guid.S()
|
||||
configGroup = gdb.ConfigGroup{
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "master",
|
||||
Type: "mysql",
|
||||
Role: "master",
|
||||
Weight: 100,
|
||||
},
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "slave",
|
||||
Type: "mysql",
|
||||
Role: "slave",
|
||||
Weight: 100,
|
||||
},
|
||||
}
|
||||
)
|
||||
gdb.SetConfigGroup(configKey, configGroup)
|
||||
masterSlaveDB := g.DB(configKey)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
table := "table_" + guid.S()
|
||||
createTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
createTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
concurrency := 10
|
||||
|
||||
// Concurrent writes to master
|
||||
wg.Add(concurrency)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
_, err := masterSlaveDB.Model(table).Insert(g.Map{
|
||||
"passport": fmt.Sprintf("concurrent_%d", id),
|
||||
"password": fmt.Sprintf("pass_%d", id),
|
||||
"nickname": fmt.Sprintf("name_%d", id),
|
||||
})
|
||||
t.AssertNil(err)
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Verify writes went to master
|
||||
count, err := masterSlaveDB.Model(table).Master().Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, concurrency)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Master_Slave_Transaction_Routing tests transaction routing to master
|
||||
func Test_Master_Slave_Transaction_Routing(t *testing.T) {
|
||||
var err error
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `master` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `slave` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
})
|
||||
defer func() {
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `master`")
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `slave`")
|
||||
}()
|
||||
|
||||
var (
|
||||
configKey = guid.S()
|
||||
configGroup = gdb.ConfigGroup{
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "master",
|
||||
Type: "mysql",
|
||||
Role: "master",
|
||||
Weight: 100,
|
||||
},
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "slave",
|
||||
Type: "mysql",
|
||||
Role: "slave",
|
||||
Weight: 100,
|
||||
},
|
||||
}
|
||||
)
|
||||
gdb.SetConfigGroup(configKey, configGroup)
|
||||
masterSlaveDB := g.DB(configKey)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
table := "table_" + guid.S()
|
||||
createTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
createTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
|
||||
// Transaction should route to master
|
||||
err := masterSlaveDB.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
||||
_, err := tx.Model(table).Insert(g.Map{
|
||||
"passport": "tx_user",
|
||||
"password": "tx_pass",
|
||||
"nickname": "tx_name",
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read within transaction should also use master
|
||||
count, err := tx.Model(table).Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, 1)
|
||||
|
||||
return nil
|
||||
})
|
||||
t.AssertNil(err)
|
||||
|
||||
// Verify data is in master
|
||||
count, err := masterSlaveDB.Model(table).Master().Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, 1)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_Master_Slave_Explicit_Selection tests explicit master/slave selection
|
||||
func Test_Master_Slave_Explicit_Selection(t *testing.T) {
|
||||
var err error
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `master` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
_, err = db.Exec(ctx, "CREATE DATABASE IF NOT EXISTS `slave` CHARACTER SET UTF8")
|
||||
t.AssertNil(err)
|
||||
})
|
||||
defer func() {
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `master`")
|
||||
_, _ = db.Exec(ctx, "DROP DATABASE `slave`")
|
||||
}()
|
||||
|
||||
var (
|
||||
configKey = guid.S()
|
||||
configGroup = gdb.ConfigGroup{
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "master",
|
||||
Type: "mysql",
|
||||
Role: "master",
|
||||
Weight: 100,
|
||||
},
|
||||
gdb.ConfigNode{
|
||||
Host: "127.0.0.1",
|
||||
Port: "3306",
|
||||
User: "root",
|
||||
Pass: "12345678",
|
||||
Name: "slave",
|
||||
Type: "mysql",
|
||||
Role: "slave",
|
||||
Weight: 100,
|
||||
},
|
||||
}
|
||||
)
|
||||
gdb.SetConfigGroup(configKey, configGroup)
|
||||
masterSlaveDB := g.DB(configKey)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
table := "table_" + guid.S()
|
||||
createTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
createTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("master"), table)
|
||||
defer dropTableWithDb(masterSlaveDB.Schema("slave"), table)
|
||||
|
||||
// Insert to master
|
||||
_, err := masterSlaveDB.Model(table).Master().Insert(g.Map{
|
||||
"passport": "explicit_test",
|
||||
"password": "pass",
|
||||
"nickname": "name",
|
||||
})
|
||||
t.AssertNil(err)
|
||||
|
||||
// Explicitly read from slave (should be empty)
|
||||
count, err := masterSlaveDB.Model(table).Slave().Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, 0)
|
||||
|
||||
// Explicitly read from master (should have data)
|
||||
count, err = masterSlaveDB.Model(table).Master().Count()
|
||||
t.AssertNil(err)
|
||||
t.Assert(count, 1)
|
||||
})
|
||||
}
|
||||
|
||||
115
contrib/drivers/mysql/mysql_z_unit_feature_metadata_test.go
Normal file
115
contrib/drivers/mysql/mysql_z_unit_feature_metadata_test.go
Normal file
@ -0,0 +1,115 @@
|
||||
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
package mysql_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/gogf/gf/v2/test/gtest"
|
||||
)
|
||||
|
||||
// Test_TableFields_Basic tests basic TableFields functionality
|
||||
func Test_TableFields_Basic(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
fields, err := db.TableFields(ctx, table)
|
||||
t.AssertNil(err)
|
||||
t.AssertGT(len(fields), 0)
|
||||
|
||||
// Verify common fields exist
|
||||
_, ok := fields["id"]
|
||||
t.Assert(ok, true)
|
||||
_, ok = fields["passport"]
|
||||
t.Assert(ok, true)
|
||||
_, ok = fields["password"]
|
||||
t.Assert(ok, true)
|
||||
_, ok = fields["nickname"]
|
||||
t.Assert(ok, true)
|
||||
_, ok = fields["create_time"]
|
||||
t.Assert(ok, true)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_TableFields_Schema tests TableFields with explicit schema
|
||||
func Test_TableFields_Schema(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
fields, err := db.TableFields(ctx, table, TestSchema1)
|
||||
t.AssertNil(err)
|
||||
t.AssertGT(len(fields), 0)
|
||||
|
||||
// Verify field properties
|
||||
idField, ok := fields["id"]
|
||||
t.Assert(ok, true)
|
||||
t.Assert(idField.Name, "id")
|
||||
t.AssertGT(idField.Index, -1)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_HasField_Positive tests HasField for existing field
|
||||
func Test_HasField_Positive(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
has, err := db.GetCore().HasField(ctx, table, "id")
|
||||
t.AssertNil(err)
|
||||
t.Assert(has, true)
|
||||
|
||||
has, err = db.GetCore().HasField(ctx, table, "passport")
|
||||
t.AssertNil(err)
|
||||
t.Assert(has, true)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_HasField_Negative tests HasField for non-existent field
|
||||
func Test_HasField_Negative(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
has, err := db.GetCore().HasField(ctx, table, "non_exist_field")
|
||||
t.AssertNil(err)
|
||||
t.Assert(has, false)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_HasField_Schema tests HasField with explicit schema
|
||||
func Test_HasField_Schema(t *testing.T) {
|
||||
table := createInitTable()
|
||||
defer dropTable(table)
|
||||
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
has, err := db.GetCore().HasField(ctx, table, "id", TestSchema1)
|
||||
t.AssertNil(err)
|
||||
t.Assert(has, true)
|
||||
})
|
||||
}
|
||||
|
||||
// Test_QuoteWord_Basic tests basic QuoteWord functionality
|
||||
func Test_QuoteWord_Basic(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
quoted := db.GetCore().QuoteWord("user")
|
||||
t.Assert(quoted, "`user`")
|
||||
|
||||
quoted = db.GetCore().QuoteWord("user_table")
|
||||
t.Assert(quoted, "`user_table`")
|
||||
})
|
||||
}
|
||||
|
||||
// Test_QuoteWord_AlreadyQuoted tests QuoteWord with already quoted words
|
||||
func Test_QuoteWord_AlreadyQuoted(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
// If already quoted, should not double quote
|
||||
quoted := db.GetCore().QuoteWord("`user`")
|
||||
t.Assert(quoted, "`user`")
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user