From 2066aa4803ecaa5ddb42f4d2e5916227a29be9d1 Mon Sep 17 00:00:00 2001 From: John Guo Date: Sat, 7 Dec 2024 14:01:31 +0800 Subject: [PATCH] feat(database/gdb): add transaction propagation&isolation level&readonly features (#4013) --- .../drivers/mysql/mysql_z_unit_init_test.go | 4 +- .../mysql/mysql_z_unit_transaction_test.go | 625 +++++++++++++++++- database/gdb/gdb.go | 383 +++++++++-- database/gdb/gdb_core.go | 14 +- database/gdb/gdb_core_transaction.go | 576 +++++----------- database/gdb/gdb_core_txcore.go | 412 ++++++++++++ database/gdb/gdb_core_underlying.go | 14 +- database/gdb/gdb_model_transaction.go | 14 + 8 files changed, 1527 insertions(+), 515 deletions(-) create mode 100644 database/gdb/gdb_core_txcore.go diff --git a/contrib/drivers/mysql/mysql_z_unit_init_test.go b/contrib/drivers/mysql/mysql_z_unit_init_test.go index 1326378c3..f0621741c 100644 --- a/contrib/drivers/mysql/mysql_z_unit_init_test.go +++ b/contrib/drivers/mysql/mysql_z_unit_init_test.go @@ -10,6 +10,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/gogf/gf/v2/container/garray" "github.com/gogf/gf/v2/database/gdb" @@ -41,7 +42,8 @@ var ( func init() { nodeDefault := gdb.ConfigNode{ - Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3306)/?loc=Local&parseTime=true", TestDbPass), + ExecTimeout: time.Second * 2, + Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3306)/?loc=Local&parseTime=true", TestDbPass), } partitionDefault := gdb.ConfigNode{ Link: fmt.Sprintf("mysql:root:%s@tcp(127.0.0.1:3307)/?loc=Local&parseTime=true", TestDbPass), diff --git a/contrib/drivers/mysql/mysql_z_unit_transaction_test.go b/contrib/drivers/mysql/mysql_z_unit_transaction_test.go index b0e2a0cc1..89ed7eba7 100644 --- a/contrib/drivers/mysql/mysql_z_unit_transaction_test.go +++ b/contrib/drivers/mysql/mysql_z_unit_transaction_test.go @@ -8,6 +8,7 @@ package mysql_test import ( "context" + "database/sql" "fmt" "testing" @@ -807,12 +808,12 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) { ) err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { // commit - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Data(g.Map{ + err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Data(g.Map{ "id": 1, "passport": "USER_1", "password": "PASS_1", @@ -842,8 +843,8 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) { t.AssertNil(err) // rollback - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Data(g.Map{ + err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Data(g.Map{ "id": 2, "passport": "USER_2", "password": "PASS_2", @@ -869,12 +870,12 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) { // another record. err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { // commit - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Data(g.Map{ + err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = tx2.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Data(g.Map{ "id": 3, "passport": "USER_1", "password": "PASS_1", @@ -904,8 +905,8 @@ func Test_Transaction_Nested_TX_Transaction_UseTX(t *testing.T) { t.AssertNil(err) // rollback - err = tx.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Data(g.Map{ + err = tx.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Data(g.Map{ "id": 4, "passport": "USER_2", "password": "PASS_2", @@ -945,11 +946,11 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) { ) err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { // commit - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { _, err = db.Model(table).Ctx(ctx).Data(g.Map{ "id": 1, "passport": "USER_1", @@ -980,8 +981,8 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) { t.AssertNil(err) // rollback - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Ctx(ctx).Data(g.Map{ + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Ctx(ctx).Data(g.Map{ "id": 2, "passport": "USER_2", "password": "PASS_2", @@ -1007,11 +1008,11 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) { err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { // commit - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { _, err = db.Model(table).Ctx(ctx).Data(g.Map{ "id": 3, "passport": "USER_1", @@ -1042,8 +1043,8 @@ func Test_Transaction_Nested_TX_Transaction_UseDB(t *testing.T) { t.AssertNil(err) // rollback - err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { - _, err = tx.Model(table).Ctx(ctx).Data(g.Map{ + err = db.Transaction(ctx, func(ctx context.Context, tx2 gdb.TX) error { + _, err = tx2.Model(table).Ctx(ctx).Data(g.Map{ "id": 4, "passport": "USER_2", "password": "PASS_2", @@ -1143,3 +1144,567 @@ func Test_Transaction_Method(t *testing.T) { t.Assert(count, int64(0)) }) } + +func Test_Transaction_Propagation(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationRequired + err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + // Insert initial record + _, err := tx.Insert(table, g.Map{ + "id": 1, + "passport": "required", + }) + t.AssertNil(err) + + // Nested transaction with PropagationRequired + err = tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequired, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should use the same transaction + _, err := tx2.Insert(table, g.Map{ + "id": 2, + "passport": "required_nested", + }) + return err + }) + t.AssertNil(err) + + return nil + }) + t.AssertNil(err) + + // Verify both records exist + count, err := db.Model(table).Count() + t.AssertNil(err) + t.Assert(count, int64(2)) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationRequiresNew + err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + // Insert in outer transaction + _, err := tx.Insert(table, g.Map{ + "id": 3, + "passport": "outer", + }) + t.AssertNil(err) + + // Inner transaction with PropagationRequiresNew + err = tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + }, func(ctx context.Context, tx2 gdb.TX) error { + // This is a new transaction + _, _ = tx2.Insert(table, g.Map{ + "id": 4, + "passport": "inner_new", + }) + // Simulate error to test independent rollback + return gerror.New("rollback inner transaction") + }) + // Inner transaction error should not affect outer transaction + t.AssertNE(err, nil) + + return nil + }) + t.AssertNil(err) + + // Verify only outer transaction record exists + count, err := db.Model(table).Where("passport", "outer").Count() + t.AssertNil(err) + t.Assert(count, int64(1)) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationNested + err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + // Insert in outer transaction + _, err := tx.Insert(table, g.Map{ + "id": 5, + "passport": "nested_outer", + }) + t.AssertNil(err) + + // Nested transaction + err = tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNested, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, _ = tx2.Insert(table, g.Map{ + "id": 6, + "passport": "nested_inner", + }) + // Simulate error to test savepoint rollback + return gerror.New("rollback to savepoint") + }) + t.AssertNE(err, nil) + + // Insert another record after nested transaction rollback + _, err = tx.Insert(table, g.Map{ + "id": 7, + "passport": "nested_after", + }) + t.AssertNil(err) + + return nil + }) + t.AssertNil(err) + + // Verify outer transaction records exist, but nested transaction record doesn't + count, err := db.Model(table).Where("passport", "nested_inner").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + count, err = db.Model(table).Where("passport IN(?,?)", + "nested_outer", "nested_after").Count() + t.AssertNil(err) + t.Assert(count, int64(2)) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationNotSupported + err := db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + // Insert in transaction + _, err := tx.Insert(table, g.Map{ + "id": 8, + "passport": "tx_record", + }) + t.AssertNil(err) + + // Non-transactional operation + err = tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNotSupported, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should execute without transaction + t.Assert(tx2, nil) + _, err := db.Insert(ctx, table, g.Map{ + "id": 9, + "passport": "non_tx_record", + }) + return err + }) + t.AssertNil(err) + + return gerror.New("rollback outer transaction") + }) + t.AssertNE(err, nil) + + // Verify transactional record is rolled back but non-transactional record exists + count, err := db.Model(table).Where("passport", "tx_record").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + count, err = db.Model(table).Where("passport", "non_tx_record").Count() + t.AssertNil(err) + t.Assert(count, int64(1)) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationMandatory + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationMandatory, + }, func(ctx context.Context, tx gdb.TX) error { + return nil + }) + // Should fail because no transaction exists + t.AssertNE(err, nil) + + // Test within an existing transaction + err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + return tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationMandatory, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should succeed because transaction exists + _, err := tx2.Insert(table, g.Map{ + "id": 10, + "passport": "mandatory", + }) + return err + }) + }) + t.AssertNil(err) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test PropagationNever + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNever, + }, func(ctx context.Context, tx gdb.TX) error { + // Should execute without transaction + t.Assert(tx, nil) + _, err := db.Insert(ctx, table, g.Map{ + "id": 11, + "passport": "never", + }) + return err + }) + t.AssertNil(err) + + // Test within an existing transaction + err = db.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + return tx.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNever, + }, func(ctx context.Context, tx2 gdb.TX) error { + return nil + }) + }) + // Should fail because transaction exists + t.AssertNE(err, nil) + }) +} + +func Test_Transaction_Propagation_Complex(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + table1 := createTable() + table2 := createTable() + defer dropTable(table1) + defer dropTable(table2) + + // Test nested transactions with different propagation behaviors + err := db.Transaction(ctx, func(ctx context.Context, tx1 gdb.TX) error { + // Insert in outer transaction + _, err := tx1.Insert(table1, g.Map{ + "id": 1, + "passport": "outer", + }) + t.AssertNil(err) + + // First nested transaction (NESTED) + err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNested, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, err := tx2.Insert(table1, g.Map{ + "id": 2, + "passport": "nested1", + }) + t.AssertNil(err) + + // Second nested transaction (REQUIRES_NEW) + err = tx2.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + }, func(ctx context.Context, tx3 gdb.TX) error { + _, _ = tx3.Insert(table1, g.Map{ + "id": 3, + "passport": "new1", + }) + // This will be rolled back independently + return gerror.New("rollback new transaction") + }) + t.AssertNE(err, nil) + + // Third nested transaction (NESTED) + return tx2.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNested, + }, func(ctx context.Context, tx3 gdb.TX) error { + _, _ = tx3.Insert(table1, g.Map{ + "id": 4, + "passport": "nested2", + }) + // This will rollback to the savepoint + return gerror.New("rollback nested transaction") + }) + }) + t.AssertNE(err, nil) + + // Fourth transaction (NOT_SUPPORTED) + // Note that, it locks table if it continues using table1. + err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNotSupported, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should execute without transaction + t.Assert(tx2, nil) + + _, err := db.Insert(ctx, table2, g.Map{ + "id": 5, + "passport": "not_supported", + }) + return err + }) + t.AssertNil(err) + + return nil + }) + t.AssertNil(err) + + // Verify final state + // 1. "outer" should exist (committed) + count, err := db.Model(table1).Where("passport", "outer").Count() + t.AssertNil(err) + t.Assert(count, int64(1)) + + // 2. "nested1" should not exist (rolled back due to error) + count, err = db.Model(table1).Where("passport", "nested1").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + // 3. "new1" should not exist (rolled back independently) + count, err = db.Model(table1).Where("passport", "new1").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + // 4. "nested2" should not exist (rolled back to savepoint) + count, err = db.Model(table1).Where("passport", "nested2").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + // 5. "not_supported" should exist (non-transactional) + count, err = db.Model(table2).Where("passport", "not_supported").Count() + t.AssertNil(err) + t.Assert(count, int64(1)) + }) + + gtest.C(t, func(t *gtest.T) { + table := createTable() + defer dropTable(table) + + // Test transaction suspension and resume + err := db.Transaction(ctx, func(ctx context.Context, tx1 gdb.TX) error { + // Insert in outer transaction + _, err := tx1.Insert(table, g.Map{ + "id": 6, + "passport": "suspend_outer", + "password": "pass6", + "nickname": "suspend_outer", + "create_time": gtime.Now().String(), + }) + t.AssertNil(err) + + // Suspend current transaction (NOT_SUPPORTED) + err = tx1.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationNotSupported, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should execute without transaction + t.Assert(tx2, nil) + + // Start a new independent transaction + return db.Transaction(ctx, func(ctx context.Context, tx3 gdb.TX) error { + _, err := tx3.Insert(table, g.Map{ + "id": 7, + "passport": "independent", + "password": "pass7", + "nickname": "independent", + "create_time": gtime.Now().String(), + }) + return err + }) + }) + t.AssertNil(err) + + // Resume original transaction + _, err = tx1.Insert(table, g.Map{ + "id": 8, + "passport": "suspend_resume", + "password": "pass8", + "nickname": "suspend_resume", + "create_time": gtime.Now().String(), + }) + t.AssertNil(err) + + // Simulate error to rollback outer transaction + return gerror.New("rollback outer transaction") + }) + t.AssertNE(err, nil) + + // Verify final state + // 1. "suspend_outer" and "suspend_resume" should not exist (rolled back) + count, err := db.Model(table).Where("passport IN(?,?)", + "suspend_outer", "suspend_resume").Count() + t.AssertNil(err) + t.Assert(count, int64(0)) + + // 2. "independent" should exist (committed independently) + count, err = db.Model(table).Where("passport", "independent").Count() + t.AssertNil(err) + t.Assert(count, int64(1)) + }) +} + +func Test_Transaction_ReadOnly(t *testing.T) { + table := createInitTable() + defer dropTable(table) + + gtest.C(t, func(t *gtest.T) { + // Test read-only transaction + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + ReadOnly: true, + }, func(ctx context.Context, tx gdb.TX) error { + // Try to modify data in read-only transaction + _, err := tx.Update(table, g.Map{"passport": "changed"}, "id=1") + // Should return error + return err + }) + t.AssertNE(err, nil) + + // Verify data was not modified + v, err := db.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v.String(), "user_1") + }) +} + +func Test_Transaction_Isolation(t *testing.T) { + // Test READ UNCOMMITTED + gtest.C(t, func(t *gtest.T) { + table := createInitTable() + defer dropTable(table) + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Isolation: sql.LevelReadUncommitted, + }, func(ctx context.Context, tx1 gdb.TX) error { + // Update value in first transaction + _, err := tx1.Update(table, g.Map{"passport": "dirty_read"}, "id=1") + t.AssertNil(err) + + // Start another transaction to verify dirty read + err = db.TransactionWithOptions(ctx, gdb.TxOptions{ + Isolation: sql.LevelReadUncommitted, + }, func(ctx context.Context, tx2 gdb.TX) error { + // Should see uncommitted change in READ UNCOMMITTED + v, err := tx2.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v.String(), "dirty_read") + return nil + }) + t.AssertNil(err) + + // Rollback the first transaction + return gerror.New("rollback first transaction") + }) + t.AssertNE(err, nil) + + // Verify the value is rolled back + v, err := db.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v.String(), "user_1") + }) + + // Test REPEATABLE READ (default) + gtest.C(t, func(t *gtest.T) { + table := createInitTable() + defer dropTable(table) + + // Start a transaction with REPEATABLE READ isolation + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + Isolation: sql.LevelRepeatableRead, + }, func(ctx context.Context, tx1 gdb.TX) error { + // First read + v1, err := tx1.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + initialValue := v1.String() + + // Another transaction updates and commits the value + err = db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, err := tx2.Update(table, g.Map{ + "passport": "changed_value", + }, "id=1") + t.AssertNil(err) + return nil + }) + t.AssertNil(err) + + // Verify the change is visible outside transaction + v, err := db.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v.String(), "changed_value") + + // Should still see old value in REPEATABLE READ transaction + v2, err := tx1.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v2.String(), initialValue) + + // Even after multiple reads, should still see the same value + v3, err := tx1.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v3.String(), initialValue) + + return nil + }) + t.AssertNil(err) + + // After transaction ends, should see the committed change + v, err := db.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v.String(), "changed_value") + }) + + // Test SERIALIZABLE + gtest.C(t, func(t *gtest.T) { + table := createInitTable() + defer dropTable(table) + + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + Isolation: sql.LevelSerializable, + }, func(ctx context.Context, tx1 gdb.TX) error { + // Read all records + _, err := tx1.Model(table).All() + t.AssertNil(err) + + // Try concurrent insert in another transaction + err = db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + Isolation: sql.LevelSerializable, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, err := tx2.Insert(table, g.Map{ + "id": 1000, + "passport": "new_user", + }) + return err + }) + t.AssertNE(err, nil) + return nil + }) + t.AssertNil(err) + }) + + // Test READ COMMITTED + gtest.C(t, func(t *gtest.T) { + table := createInitTable() + defer dropTable(table) + err := db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + Isolation: sql.LevelReadCommitted, + }, func(ctx context.Context, tx1 gdb.TX) error { + // First read + v1, err := tx1.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + initialValue := v1.String() + + // Another transaction updates and commits + err = db.TransactionWithOptions(ctx, gdb.TxOptions{ + Propagation: gdb.PropagationRequiresNew, + Isolation: sql.LevelReadCommitted, + }, func(ctx context.Context, tx2 gdb.TX) error { + _, err := tx2.Update(table, g.Map{"passport": "committed_value"}, "id=1") + return err + }) + t.AssertNil(err) + + // Should see new value in READ COMMITTED + v2, err := tx1.Model(table).Where("id=1").Value("passport") + t.AssertNil(err) + t.Assert(v2.String(), "committed_value") + t.AssertNE(v2.String(), initialValue) + return nil + }) + t.AssertNil(err) + }) +} diff --git a/database/gdb/gdb.go b/database/gdb/gdb.go index f9851dbac..a8c2a2bdf 100644 --- a/database/gdb/gdb.go +++ b/database/gdb/gdb.go @@ -48,7 +48,7 @@ type DB interface { // Raw creates and returns a model based on a raw sql not a table. Raw(rawSql string, args ...interface{}) *Model - // Schema creates and returns a schema. + // Schema switches to a specified schema. // Also see Core.Schema. Schema(schema string) *Schema @@ -58,7 +58,6 @@ type DB interface { // Open creates a raw connection object for database with given node configuration. // Note that it is not recommended using the function manually. - // Also see DriverMysql.Open. Open(config *ConfigNode) (*sql.DB, error) // Ctx is a chaining function, which creates and returns a new DB that is a shallow copy @@ -78,173 +77,422 @@ type DB interface { // Query APIs. // =========================================================================== - Query(ctx context.Context, sql string, args ...interface{}) (Result, error) // See Core.Query. - Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) // See Core.Exec. - Prepare(ctx context.Context, sql string, execOnMaster ...bool) (*Stmt, error) // See Core.Prepare. + // Query executes a SQL query that returns rows using given SQL and arguments. + // The args are for any placeholder parameters in the query. + Query(ctx context.Context, sql string, args ...interface{}) (Result, error) + + // Exec executes a SQL query that doesn't return rows (e.g., INSERT, UPDATE, DELETE). + // It returns sql.Result for accessing LastInsertId or RowsAffected. + Exec(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) + + // Prepare creates a prepared statement for later queries or executions. + // The execOnMaster parameter determines whether the statement executes on master node. + Prepare(ctx context.Context, sql string, execOnMaster ...bool) (*Stmt, error) // =========================================================================== // Common APIs for CURD. // =========================================================================== - Insert(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Insert. - InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.InsertIgnore. - InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) // See Core.InsertAndGetId. - Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Replace. - Save(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) // See Core.Save. - Update(ctx context.Context, table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) // See Core.Update. - Delete(ctx context.Context, table string, condition interface{}, args ...interface{}) (sql.Result, error) // See Core.Delete. + // Insert inserts one or multiple records into table. + // The data can be a map, struct, or slice of maps/structs. + // The optional batch parameter specifies the batch size for bulk inserts. + Insert(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) + + // InsertIgnore inserts records but ignores duplicate key errors. + // It works like Insert but adds IGNORE keyword to the SQL statement. + InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) + + // InsertAndGetId inserts a record and returns the auto-generated ID. + // It's a convenience method combining Insert with LastInsertId. + InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) + + // Replace inserts or replaces records using REPLACE INTO syntax. + // Existing records with same unique key will be deleted and re-inserted. + Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) + + // Save inserts or updates records using INSERT ... ON DUPLICATE KEY UPDATE syntax. + // It updates existing records instead of replacing them entirely. + Save(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) + + // Update updates records in table that match the condition. + // The data can be a map or struct containing the new values. + // The condition specifies the WHERE clause with optional placeholder args. + Update(ctx context.Context, table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) + + // Delete deletes records from table that match the condition. + // The condition specifies the WHERE clause with optional placeholder args. + Delete(ctx context.Context, table string, condition interface{}, args ...interface{}) (sql.Result, error) // =========================================================================== // Internal APIs for CURD, which can be overwritten by custom CURD implements. // =========================================================================== - DoSelect(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoSelect. - DoInsert(ctx context.Context, link Link, table string, data List, option DoInsertOption) (result sql.Result, err error) // See Core.DoInsert. - DoUpdate(ctx context.Context, link Link, table string, data interface{}, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoUpdate. - DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error) // See Core.DoDelete. + // DoSelect executes a SELECT query using the given link and returns the result. + // This is an internal method that can be overridden by custom implementations. + DoSelect(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) - DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) // See Core.DoQuery. - DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error) // See Core.DoExec. + // DoInsert performs the actual INSERT operation with given options. + // This is an internal method that can be overridden by custom implementations. + DoInsert(ctx context.Context, link Link, table string, data List, option DoInsertOption) (result sql.Result, err error) - DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) // See Core.DoFilter. - DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) // See Core.DoCommit. + // DoUpdate performs the actual UPDATE operation. + // This is an internal method that can be overridden by custom implementations. + DoUpdate(ctx context.Context, link Link, table string, data interface{}, condition string, args ...interface{}) (result sql.Result, err error) - DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error) // See Core.DoPrepare. + // DoDelete performs the actual DELETE operation. + // This is an internal method that can be overridden by custom implementations. + DoDelete(ctx context.Context, link Link, table string, condition string, args ...interface{}) (result sql.Result, err error) + + // DoQuery executes a query that returns rows. + // This is an internal method that can be overridden by custom implementations. + DoQuery(ctx context.Context, link Link, sql string, args ...interface{}) (result Result, err error) + + // DoExec executes a query that doesn't return rows. + // This is an internal method that can be overridden by custom implementations. + DoExec(ctx context.Context, link Link, sql string, args ...interface{}) (result sql.Result, err error) + + // DoFilter processes and filters SQL and args before execution. + // This is an internal method that can be overridden to implement custom SQL filtering. + DoFilter(ctx context.Context, link Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) + + // DoCommit handles the actual commit operation for transactions. + // This is an internal method that can be overridden by custom implementations. + DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutput, err error) + + // DoPrepare creates a prepared statement on the given link. + // This is an internal method that can be overridden by custom implementations. + DoPrepare(ctx context.Context, link Link, sql string) (*Stmt, error) // =========================================================================== // Query APIs for convenience purpose. // =========================================================================== - GetAll(ctx context.Context, sql string, args ...interface{}) (Result, error) // See Core.GetAll. - GetOne(ctx context.Context, sql string, args ...interface{}) (Record, error) // See Core.GetOne. - GetValue(ctx context.Context, sql string, args ...interface{}) (Value, error) // See Core.GetValue. - GetArray(ctx context.Context, sql string, args ...interface{}) ([]Value, error) // See Core.GetArray. - GetCount(ctx context.Context, sql string, args ...interface{}) (int, error) // See Core.GetCount. - GetScan(ctx context.Context, objPointer interface{}, sql string, args ...interface{}) error // See Core.GetScan. - Union(unions ...*Model) *Model // See Core.Union. - UnionAll(unions ...*Model) *Model // See Core.UnionAll. + // GetAll executes a query and returns all rows as Result. + // It's a convenience wrapper around Query. + GetAll(ctx context.Context, sql string, args ...interface{}) (Result, error) + + // GetOne executes a query and returns the first row as Record. + // It's useful when you expect only one row to be returned. + GetOne(ctx context.Context, sql string, args ...interface{}) (Record, error) + + // GetValue executes a query and returns the first column of the first row. + // It's useful for queries like SELECT COUNT(*) or getting a single value. + GetValue(ctx context.Context, sql string, args ...interface{}) (Value, error) + + // GetArray executes a query and returns the first column of all rows. + // It's useful for queries like SELECT id FROM table. + GetArray(ctx context.Context, sql string, args ...interface{}) ([]Value, error) + + // GetCount executes a COUNT query and returns the result as an integer. + // It's a convenience method for counting rows. + GetCount(ctx context.Context, sql string, args ...interface{}) (int, error) + + // GetScan executes a query and scans the result into the given object pointer. + // It automatically maps database columns to struct fields or slice elements. + GetScan(ctx context.Context, objPointer interface{}, sql string, args ...interface{}) error + + // Union combines multiple SELECT queries using UNION operator. + // It returns a new Model that represents the combined query. + Union(unions ...*Model) *Model + + // UnionAll combines multiple SELECT queries using UNION ALL operator. + // Unlike Union, it keeps duplicate rows in the result. + UnionAll(unions ...*Model) *Model // =========================================================================== // Master/Slave specification support. // =========================================================================== - Master(schema ...string) (*sql.DB, error) // See Core.Master. - Slave(schema ...string) (*sql.DB, error) // See Core.Slave. + // Master returns a connection to the master database node. + // The optional schema parameter specifies which database schema to use. + Master(schema ...string) (*sql.DB, error) + + // Slave returns a connection to a slave database node. + // The optional schema parameter specifies which database schema to use. + Slave(schema ...string) (*sql.DB, error) // =========================================================================== // Ping-Pong. // =========================================================================== - PingMaster() error // See Core.PingMaster. - PingSlave() error // See Core.PingSlave. + // PingMaster checks if the master database node is accessible. + // It returns an error if the connection fails. + PingMaster() error + + // PingSlave checks if any slave database node is accessible. + // It returns an error if no slave connections are available. + PingSlave() error // =========================================================================== // Transaction. // =========================================================================== - Begin(ctx context.Context) (TX, error) // See Core.Begin. - Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) error // See Core.Transaction. + // Begin starts a new transaction and returns a TX interface. + // The returned TX must be committed or rolled back to release resources. + Begin(ctx context.Context) (TX, error) + + // BeginWithOptions starts a new transaction with the given options and returns a TX interface. + // The options allow specifying isolation level and read-only mode. + // The returned TX must be committed or rolled back to release resources. + BeginWithOptions(ctx context.Context, opts TxOptions) (TX, error) + + // Transaction executes a function within a transaction. + // It automatically handles commit/rollback based on whether f returns an error. + Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) error + + // TransactionWithOptions executes a function within a transaction with specific options. + // It allows customizing transaction behavior like isolation level and timeout. + TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) error // =========================================================================== // Configuration methods. // =========================================================================== - GetCache() *gcache.Cache // See Core.GetCache. - SetDebug(debug bool) // See Core.SetDebug. - GetDebug() bool // See Core.GetDebug. - GetSchema() string // See Core.GetSchema. - GetPrefix() string // See Core.GetPrefix. - GetGroup() string // See Core.GetGroup. - SetDryRun(enabled bool) // See Core.SetDryRun. - GetDryRun() bool // See Core.GetDryRun. - SetLogger(logger glog.ILogger) // See Core.SetLogger. - GetLogger() glog.ILogger // See Core.GetLogger. - GetConfig() *ConfigNode // See Core.GetConfig. - SetMaxIdleConnCount(n int) // See Core.SetMaxIdleConnCount. - SetMaxOpenConnCount(n int) // See Core.SetMaxOpenConnCount. - SetMaxConnLifeTime(d time.Duration) // See Core.SetMaxConnLifeTime. + // GetCache returns the cache instance used by this database. + // The cache is used for query results caching. + GetCache() *gcache.Cache + + // SetDebug enables or disables debug mode for SQL logging. + // When enabled, all SQL statements and their execution time are logged. + SetDebug(debug bool) + + // GetDebug returns whether debug mode is enabled. + GetDebug() bool + + // GetSchema returns the current database schema name. + GetSchema() string + + // GetPrefix returns the table name prefix used by this database. + GetPrefix() string + + // GetGroup returns the configuration group name of this database. + GetGroup() string + + // SetDryRun enables or disables dry-run mode. + // In dry-run mode, SQL statements are generated but not executed. + SetDryRun(enabled bool) + + // GetDryRun returns whether dry-run mode is enabled. + GetDryRun() bool + + // SetLogger sets a custom logger for database operations. + // The logger must implement glog.ILogger interface. + SetLogger(logger glog.ILogger) + + // GetLogger returns the current logger used by this database. + GetLogger() glog.ILogger + + // GetConfig returns the configuration node used by this database. + GetConfig() *ConfigNode + + // SetMaxIdleConnCount sets the maximum number of idle connections in the pool. + SetMaxIdleConnCount(n int) + + // SetMaxOpenConnCount sets the maximum number of open connections to the database. + SetMaxOpenConnCount(n int) + + // SetMaxConnLifeTime sets the maximum amount of time a connection may be reused. + SetMaxConnLifeTime(d time.Duration) // =========================================================================== // Utility methods. // =========================================================================== - Stats(ctx context.Context) []StatsItem // See Core.Stats. - GetCtx() context.Context // See Core.GetCtx. - GetCore() *Core // See Core.GetCore - GetChars() (charLeft string, charRight string) // See Core.GetChars. - Tables(ctx context.Context, schema ...string) (tables []string, err error) // See Core.Tables. The driver must implement this function. - TableFields(ctx context.Context, table string, schema ...string) (map[string]*TableField, error) // See Core.TableFields. The driver must implement this function. - ConvertValueForField(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) // See Core.ConvertValueForField - ConvertValueForLocal(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) // See Core.ConvertValueForLocal - CheckLocalTypeForField(ctx context.Context, fieldType string, fieldValue interface{}) (LocalType, error) // See Core.CheckLocalTypeForField - FormatUpsert(columns []string, list List, option DoInsertOption) (string, error) // See Core.DoFormatUpsert - OrderRandomFunction() string // See Core.OrderRandomFunction + // Stats returns statistics about the database connection pool. + // It includes information like the number of active and idle connections. + Stats(ctx context.Context) []StatsItem + + // GetCtx returns the context associated with this database instance. + GetCtx() context.Context + + // GetCore returns the underlying Core instance of this database. + GetCore() *Core + + // GetChars returns the left and right quote characters used for escaping identifiers. + // For example, in MySQL these are backticks: ` and `. + GetChars() (charLeft string, charRight string) + + // Tables returns a list of all table names in the specified schema. + // If no schema is specified, it uses the default schema. + Tables(ctx context.Context, schema ...string) (tables []string, err error) + + // TableFields returns detailed information about all fields in the specified table. + // The returned map keys are field names and values contain field metadata. + TableFields(ctx context.Context, table string, schema ...string) (map[string]*TableField, error) + + // ConvertValueForField converts a value to the appropriate type for a database field. + // It handles type conversion from Go types to database-specific types. + ConvertValueForField(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) + + // ConvertValueForLocal converts a database value to the appropriate Go type. + // It handles type conversion from database-specific types to Go types. + ConvertValueForLocal(ctx context.Context, fieldType string, fieldValue interface{}) (interface{}, error) + + // CheckLocalTypeForField checks if a Go value is compatible with a database field type. + // It returns the appropriate LocalType and any conversion errors. + CheckLocalTypeForField(ctx context.Context, fieldType string, fieldValue interface{}) (LocalType, error) + + // FormatUpsert formats an upsert (INSERT ... ON DUPLICATE KEY UPDATE) statement. + // It generates the appropriate SQL based on the columns, values, and options provided. + FormatUpsert(columns []string, list List, option DoInsertOption) (string, error) + + // OrderRandomFunction returns the SQL function for random ordering. + // The implementation is database-specific (e.g., RAND() for MySQL). + OrderRandomFunction() string } // TX defines the interfaces for ORM transaction operations. type TX interface { Link + // Ctx binds a context to current transaction. + // The context is used for operations like timeout control. Ctx(ctx context.Context) TX + + // Raw creates and returns a model based on a raw SQL. + // The rawSql can contain placeholders ? and corresponding args. Raw(rawSql string, args ...interface{}) *Model + + // Model creates and returns a Model from given table name/struct. + // The parameter can be table name as string, or struct/*struct type. Model(tableNameQueryOrStruct ...interface{}) *Model + + // With creates and returns a Model from given object. + // It automatically analyzes the object and generates corresponding SQL. With(object interface{}) *Model // =========================================================================== // Nested transaction if necessary. // =========================================================================== + // Begin starts a nested transaction. + // It creates a new savepoint for current transaction. Begin() error + + // Commit commits current transaction/savepoint. + // For nested transactions, it releases the current savepoint. Commit() error + + // Rollback rolls back current transaction/savepoint. + // For nested transactions, it rolls back to the current savepoint. Rollback() error + + // Transaction executes given function in a nested transaction. + // It automatically handles commit/rollback based on function's error return. Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) + // TransactionWithOptions executes given function in a nested transaction with options. + // It allows customizing transaction behavior like isolation level. + TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) error + // =========================================================================== // Core method. // =========================================================================== + // Query executes a query that returns rows using given SQL and arguments. + // The args are for any placeholder parameters in the query. Query(sql string, args ...interface{}) (result Result, err error) + + // Exec executes a query that doesn't return rows. + // For example: INSERT, UPDATE, DELETE. Exec(sql string, args ...interface{}) (sql.Result, error) + + // Prepare creates a prepared statement for later queries or executions. + // Multiple queries or executions may be run concurrently from the statement. Prepare(sql string) (*Stmt, error) // =========================================================================== // Query. // =========================================================================== + // GetAll executes a query and returns all rows as Result. + // It's a convenient wrapper for Query. GetAll(sql string, args ...interface{}) (Result, error) + + // GetOne executes a query and returns the first row as Record. + // It's useful when you expect only one row to be returned. GetOne(sql string, args ...interface{}) (Record, error) + + // GetStruct executes a query and scans the result into given struct. + // The obj should be a pointer to struct. GetStruct(obj interface{}, sql string, args ...interface{}) error + + // GetStructs executes a query and scans all results into given struct slice. + // The objPointerSlice should be a pointer to slice of struct. GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error + + // GetScan executes a query and scans the result into given variables. + // The pointer can be type of struct/*struct/[]struct/[]*struct. GetScan(pointer interface{}, sql string, args ...interface{}) error + + // GetValue executes a query and returns the first column of first row. + // It's useful for queries like SELECT COUNT(*). GetValue(sql string, args ...interface{}) (Value, error) + + // GetCount executes a query that should return a count value. + // It's a convenient wrapper for count queries. GetCount(sql string, args ...interface{}) (int64, error) // =========================================================================== // CURD. // =========================================================================== + // Insert inserts one or multiple records into table. + // The data can be map/struct/*struct/[]map/[]struct/[]*struct. Insert(table string, data interface{}, batch ...int) (sql.Result, error) + + // InsertIgnore inserts one or multiple records with IGNORE option. + // It ignores records that would cause duplicate key conflicts. InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error) + + // InsertAndGetId inserts one record and returns its id value. + // It's commonly used with auto-increment primary key. InsertAndGetId(table string, data interface{}, batch ...int) (int64, error) + + // Replace inserts or replaces records using REPLACE INTO syntax. + // Existing records with same unique key will be deleted and re-inserted. Replace(table string, data interface{}, batch ...int) (sql.Result, error) + + // Save inserts or updates records using INSERT ... ON DUPLICATE KEY UPDATE syntax. + // It updates existing records instead of replacing them entirely. Save(table string, data interface{}, batch ...int) (sql.Result, error) + + // Update updates records in table that match given condition. + // The data can be map/struct, and condition supports various formats. Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) + + // Delete deletes records from table that match given condition. + // The condition supports various formats with optional arguments. Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) // =========================================================================== // Utility methods. // =========================================================================== + // GetCtx returns the context that is bound to current transaction. GetCtx() context.Context + + // GetDB returns the underlying DB interface object. GetDB() DB + + // GetSqlTX returns the underlying *sql.Tx object. + // Note: be very careful when using this method. GetSqlTX() *sql.Tx + + // IsClosed checks if current transaction is closed. + // A transaction is closed after Commit or Rollback. IsClosed() bool // =========================================================================== // Save point feature. // =========================================================================== + // SavePoint creates a save point with given name. + // It's used in nested transactions to create rollback points. SavePoint(point string) error + + // RollbackTo rolls back transaction to previously created save point. + // If the save point doesn't exist, it returns an error. RollbackTo(point string) error } @@ -287,6 +535,7 @@ type DoCommitInput struct { Sql string Args []interface{} Type SqlType + TxOptions sql.TxOptions IsTransaction bool } @@ -380,9 +629,6 @@ const ( defaultMaxIdleConnCount = 10 // Max idle connection count in pool. defaultMaxOpenConnCount = 0 // Max open connection count in pool. Default is no limit. defaultMaxConnLifeTime = 30 * time.Second // Max lifetime for per connection in pool in seconds. - ctxTimeoutTypeExec = 0 - ctxTimeoutTypeQuery = 1 - ctxTimeoutTypePrepare = 2 cachePrefixTableFields = `TableFields:` cachePrefixSelectCache = `SelectCache:` commandEnvKeyForDryRun = "gf.gdb.dryrun" @@ -396,6 +642,15 @@ const ( linkPattern = `(\w+):([\w\-\$]*):(.*?)@(\w+?)\((.+?)\)/{0,1}([^\?]*)\?{0,1}(.*)` ) +type ctxTimeoutType int + +const ( + ctxTimeoutTypeExec ctxTimeoutType = iota + ctxTimeoutTypeQuery + ctxTimeoutTypePrepare + ctxTimeoutTypeTrans +) + type SelectType int const ( @@ -640,7 +895,7 @@ func Instance(name ...string) (db DB, err error) { // The returned node is a clone of configuration node, which is safe for later modification. // // The parameter `master` specifies whether retrieving a master node, or else a slave node -// if master-slave configured. +// if master-slave nodes are configured. func getConfigNodeByGroup(group string, master bool) (*ConfigNode, error) { if list, ok := configs.config[group]; ok { // Separates master and slave configuration nodes array. diff --git a/database/gdb/gdb_core.go b/database/gdb/gdb_core.go index 0a88a17ec..f75ff878f 100644 --- a/database/gdb/gdb_core.go +++ b/database/gdb/gdb_core.go @@ -72,24 +72,30 @@ func (c *Core) GetCtx() context.Context { } // GetCtxTimeout returns the context and cancel function for specified timeout type. -func (c *Core) GetCtxTimeout(ctx context.Context, timeoutType int) (context.Context, context.CancelFunc) { +func (c *Core) GetCtxTimeout(ctx context.Context, timeoutType ctxTimeoutType) (context.Context, context.CancelFunc) { if ctx == nil { ctx = c.db.GetCtx() } else { ctx = context.WithValue(ctx, "WrappedByGetCtxTimeout", nil) } + var config = c.db.GetConfig() switch timeoutType { case ctxTimeoutTypeExec: if c.db.GetConfig().ExecTimeout > 0 { - return context.WithTimeout(ctx, c.db.GetConfig().ExecTimeout) + return context.WithTimeout(ctx, config.ExecTimeout) } case ctxTimeoutTypeQuery: if c.db.GetConfig().QueryTimeout > 0 { - return context.WithTimeout(ctx, c.db.GetConfig().QueryTimeout) + return context.WithTimeout(ctx, config.QueryTimeout) } case ctxTimeoutTypePrepare: if c.db.GetConfig().PrepareTimeout > 0 { - return context.WithTimeout(ctx, c.db.GetConfig().PrepareTimeout) + return context.WithTimeout(ctx, config.PrepareTimeout) + } + + case ctxTimeoutTypeTrans: + if c.db.GetConfig().TranTimeout > 0 { + return context.WithTimeout(ctx, config.TranTimeout) } default: panic(gerror.NewCodef(gcode.CodeInvalidParameter, "invalid context timeout type: %d", timeoutType)) diff --git a/database/gdb/gdb_core_transaction.go b/database/gdb/gdb_core_transaction.go index e098982c2..897b179b1 100644 --- a/database/gdb/gdb_core_transaction.go +++ b/database/gdb/gdb_core_transaction.go @@ -9,25 +9,50 @@ package gdb import ( "context" "database/sql" - "reflect" "github.com/gogf/gf/v2/container/gtype" "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" - "github.com/gogf/gf/v2/internal/reflection" - "github.com/gogf/gf/v2/text/gregex" - "github.com/gogf/gf/v2/util/gconv" ) -// TXCore is the struct for transaction management. -type TXCore struct { - db DB // db is the current gdb database manager. - tx *sql.Tx // tx is the raw and underlying transaction manager. - ctx context.Context // ctx is the context for this transaction only. - master *sql.DB // master is the raw and underlying database manager. - transactionId string // transactionId is a unique id generated by this object for this transaction. - transactionCount int // transactionCount marks the times that Begins. - isClosed bool // isClosed marks this transaction has already been committed or rolled back. +// Propagation defines transaction propagation behavior. +type Propagation string + +const ( + // PropagationRequired starts a new transaction if not in a transaction, + // or uses the existing transaction if already in a transaction. + PropagationRequired Propagation = "" // REQUIRED + + // PropagationSupports executes within the existing transaction if present, + // otherwise executes without transaction. + PropagationSupports Propagation = "SUPPORTS" + + // PropagationRequiresNew starts a new transaction, and suspends the current transaction if one exists. + PropagationRequiresNew Propagation = "REQUIRES_NEW" + + // PropagationNested starts a nested transaction if already in a transaction, + // or behaves like PropagationRequired if not in a transaction. + PropagationNested Propagation = "NESTED" + + // PropagationNotSupported executes non-transactional, suspends any existing transaction. + PropagationNotSupported Propagation = "NOT_SUPPORTED" + + // PropagationMandatory executes in a transaction, fails if no existing transaction. + PropagationMandatory Propagation = "MANDATORY" + + // PropagationNever executes non-transactional, fails if in an existing transaction. + PropagationNever Propagation = "NEVER" +) + +// TxOptions defines options for transaction control. +type TxOptions struct { + // Propagation specifies the propagation behavior. + Propagation Propagation + // Isolation is the transaction isolation level. + // If zero, the driver or database's default level is used. + Isolation sql.IsolationLevel + // ReadOnly is used to mark the transaction as read-only. + ReadOnly bool } const ( @@ -38,15 +63,38 @@ const ( var transactionIdGenerator = gtype.NewUint64() +// DefaultTxOptions returns the default transaction options. +func DefaultTxOptions() TxOptions { + return TxOptions{ + Propagation: PropagationRequired, + } +} + // Begin starts and returns the transaction object. // You should call Commit or Rollback functions of the transaction object // if you no longer use the transaction. Commit or Rollback functions will also // close the transaction automatically. func (c *Core) Begin(ctx context.Context) (tx TX, err error) { - return c.doBeginCtx(ctx) + return c.BeginWithOptions(ctx, DefaultTxOptions()) } -func (c *Core) doBeginCtx(ctx context.Context) (TX, error) { +// BeginWithOptions starts and returns the transaction object with given options. +// The options allow specifying the isolation level and read-only mode. +// You should call Commit or Rollback functions of the transaction object +// if you no longer use the transaction. Commit or Rollback functions will also +// close the transaction automatically. +func (c *Core) BeginWithOptions(ctx context.Context, opts TxOptions) (tx TX, err error) { + if ctx == nil { + ctx = c.db.GetCtx() + } + ctx = c.injectInternalCtxData(ctx) + return c.doBeginCtx(ctx, sql.TxOptions{ + Isolation: opts.Isolation, + ReadOnly: opts.ReadOnly, + }) +} + +func (c *Core) doBeginCtx(ctx context.Context, opts sql.TxOptions) (TX, error) { master, err := c.db.Master() if err != nil { return nil, err @@ -56,6 +104,7 @@ func (c *Core) doBeginCtx(ctx context.Context) (TX, error) { Db: master, Sql: "BEGIN", Type: SqlTypeBegin, + TxOptions: opts, IsTransaction: true, }) return out.Tx, err @@ -69,22 +118,105 @@ func (c *Core) doBeginCtx(ctx context.Context) (TX, error) { // Note that, you should not Commit or Rollback the transaction in function `f` // as it is automatically handled by this function. func (c *Core) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) { + return c.TransactionWithOptions(ctx, DefaultTxOptions(), f) +} + +// TransactionWithOptions wraps the transaction logic with propagation options using function `f`. +func (c *Core) TransactionWithOptions( + ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error, +) (err error) { if ctx == nil { ctx = c.db.GetCtx() } ctx = c.injectInternalCtxData(ctx) - // Check transaction object from context. - var tx TX - tx = TXFromCtx(ctx, c.db.GetGroup()) - if tx != nil { - return tx.Transaction(ctx, f) + + // Check current transaction from context + var ( + group = c.db.GetGroup() + currentTx = TXFromCtx(ctx, group) + ) + switch opts.Propagation { + case PropagationRequired: + if currentTx != nil { + return currentTx.Transaction(ctx, f) + } + return c.createNewTransaction(ctx, opts, f) + + case PropagationSupports: + return f(ctx, currentTx) + + case PropagationMandatory: + if currentTx == nil { + return gerror.NewCode( + gcode.CodeInvalidOperation, + "transaction propagation MANDATORY requires an existing transaction", + ) + } + return f(ctx, currentTx) + + case PropagationRequiresNew: + ctx = WithoutTX(ctx, group) + return c.createNewTransaction(ctx, opts, f) + + case PropagationNotSupported: + ctx = WithoutTX(ctx, group) + return f(ctx, nil) + + case PropagationNever: + if currentTx != nil { + return gerror.NewCode( + gcode.CodeInvalidOperation, + "transaction propagation NEVER cannot run within an existing transaction", + ) + } + return f(ctx, nil) + + case PropagationNested: + if currentTx != nil { + // Create savepoint for nested transaction + if err = currentTx.Begin(); err != nil { + return err + } + defer func() { + if err != nil { + if rbErr := currentTx.Rollback(); rbErr != nil { + err = gerror.Wrap(err, rbErr.Error()) + } + } + }() + return f(ctx, currentTx) + } + return c.createNewTransaction(ctx, opts, f) + + default: + return gerror.NewCodef( + gcode.CodeInvalidParameter, + "unsupported propagation behavior: %s", + opts.Propagation, + ) } - tx, err = c.doBeginCtx(ctx) +} + +// createNewTransaction handles creating and managing a new transaction +func (c *Core) createNewTransaction( + ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error, +) (err error) { + // Begin transaction with options + tx, err := c.doBeginCtx(ctx, sql.TxOptions{ + Isolation: opts.Isolation, + ReadOnly: opts.ReadOnly, + }) if err != nil { return err } - // Inject transaction object into context. - tx = tx.Ctx(WithTX(tx.GetCtx(), tx)) + + // Inject transaction object into context + ctx = WithTX(tx.GetCtx(), tx) + err = callTxFunc(tx.Ctx(ctx), f) + return +} + +func callTxFunc(tx TX, f func(ctx context.Context, tx TX) error) (err error) { defer func() { if err == nil { if exception := recover(); exception != nil { @@ -128,6 +260,12 @@ func WithTX(ctx context.Context, tx TX) context.Context { return ctx } +// WithoutTX removed transaction object from context and returns a new context. +func WithoutTX(ctx context.Context, group string) context.Context { + ctx = context.WithValue(ctx, transactionKeyForContext(group), nil) + return ctx +} + // TXFromCtx retrieves and returns transaction object from context. // It is usually used in nested transaction feature, and it returns nil if it is not set previously. func TXFromCtx(ctx context.Context, group string) TX { @@ -150,395 +288,3 @@ func TXFromCtx(ctx context.Context, group string) TX { func transactionKeyForContext(group string) string { return contextTransactionKeyPrefix + group } - -// transactionKeyForNestedPoint forms and returns the transaction key at current save point. -func (tx *TXCore) transactionKeyForNestedPoint() string { - return tx.db.GetCore().QuoteWord(transactionPointerPrefix + gconv.String(tx.transactionCount)) -} - -// Ctx sets the context for current transaction. -func (tx *TXCore) Ctx(ctx context.Context) TX { - tx.ctx = ctx - if tx.ctx != nil { - tx.ctx = tx.db.GetCore().injectInternalCtxData(tx.ctx) - } - return tx -} - -// GetCtx returns the context for current transaction. -func (tx *TXCore) GetCtx() context.Context { - return tx.ctx -} - -// GetDB returns the DB for current transaction. -func (tx *TXCore) GetDB() DB { - return tx.db -} - -// GetSqlTX returns the underlying transaction object for current transaction. -func (tx *TXCore) GetSqlTX() *sql.Tx { - return tx.tx -} - -// Commit commits current transaction. -// Note that it releases previous saved transaction point if it's in a nested transaction procedure, -// or else it commits the hole transaction. -func (tx *TXCore) Commit() error { - if tx.transactionCount > 0 { - tx.transactionCount-- - _, err := tx.Exec("RELEASE SAVEPOINT " + tx.transactionKeyForNestedPoint()) - return err - } - _, err := tx.db.DoCommit(tx.ctx, DoCommitInput{ - Tx: tx.tx, - Sql: "COMMIT", - Type: SqlTypeTXCommit, - IsTransaction: true, - }) - if err == nil { - tx.isClosed = true - } - return err -} - -// Rollback aborts current transaction. -// Note that it aborts current transaction if it's in a nested transaction procedure, -// or else it aborts the hole transaction. -func (tx *TXCore) Rollback() error { - if tx.transactionCount > 0 { - tx.transactionCount-- - _, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.transactionKeyForNestedPoint()) - return err - } - _, err := tx.db.DoCommit(tx.ctx, DoCommitInput{ - Tx: tx.tx, - Sql: "ROLLBACK", - Type: SqlTypeTXRollback, - IsTransaction: true, - }) - if err == nil { - tx.isClosed = true - } - return err -} - -// IsClosed checks and returns this transaction has already been committed or rolled back. -func (tx *TXCore) IsClosed() bool { - return tx.isClosed -} - -// Begin starts a nested transaction procedure. -func (tx *TXCore) Begin() error { - _, err := tx.Exec("SAVEPOINT " + tx.transactionKeyForNestedPoint()) - if err != nil { - return err - } - tx.transactionCount++ - return nil -} - -// SavePoint performs `SAVEPOINT xxx` SQL statement that saves transaction at current point. -// The parameter `point` specifies the point name that will be saved to server. -func (tx *TXCore) SavePoint(point string) error { - _, err := tx.Exec("SAVEPOINT " + tx.db.GetCore().QuoteWord(point)) - return err -} - -// RollbackTo performs `ROLLBACK TO SAVEPOINT xxx` SQL statement that rollbacks to specified saved transaction. -// The parameter `point` specifies the point name that was saved previously. -func (tx *TXCore) RollbackTo(point string) error { - _, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.db.GetCore().QuoteWord(point)) - return err -} - -// Transaction wraps the transaction logic using function `f`. -// It rollbacks the transaction and returns the error from function `f` if -// it returns non-nil error. It commits the transaction and returns nil if -// function `f` returns nil. -// -// Note that, you should not Commit or Rollback the transaction in function `f` -// as it is automatically handled by this function. -func (tx *TXCore) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) { - if ctx != nil { - tx.ctx = ctx - } - // Check transaction object from context. - if TXFromCtx(tx.ctx, tx.db.GetGroup()) == nil { - // Inject transaction object into context. - tx.ctx = WithTX(tx.ctx, tx) - } - err = tx.Begin() - if err != nil { - return err - } - defer func() { - if err == nil { - if exception := recover(); exception != nil { - if v, ok := exception.(error); ok && gerror.HasStack(v) { - err = v - } else { - err = gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception) - } - } - } - if err != nil { - if e := tx.Rollback(); e != nil { - err = e - } - } else { - if e := tx.Commit(); e != nil { - err = e - } - } - }() - err = f(tx.ctx, tx) - return -} - -// Query does query operation on transaction. -// See Core.Query. -func (tx *TXCore) Query(sql string, args ...interface{}) (result Result, err error) { - return tx.db.DoQuery(tx.ctx, &txLink{tx.tx}, sql, args...) -} - -// Exec does none query operation on transaction. -// See Core.Exec. -func (tx *TXCore) Exec(sql string, args ...interface{}) (sql.Result, error) { - return tx.db.DoExec(tx.ctx, &txLink{tx.tx}, sql, args...) -} - -// Prepare creates a prepared statement for later queries or executions. -// Multiple queries or executions may be run concurrently from the -// returned statement. -// The caller must call the statement's Close method -// when the statement is no longer needed. -func (tx *TXCore) Prepare(sql string) (*Stmt, error) { - return tx.db.DoPrepare(tx.ctx, &txLink{tx.tx}, sql) -} - -// GetAll queries and returns data records from database. -func (tx *TXCore) GetAll(sql string, args ...interface{}) (Result, error) { - return tx.Query(sql, args...) -} - -// GetOne queries and returns one record from database. -func (tx *TXCore) GetOne(sql string, args ...interface{}) (Record, error) { - list, err := tx.GetAll(sql, args...) - if err != nil { - return nil, err - } - if len(list) > 0 { - return list[0], nil - } - return nil, nil -} - -// GetStruct queries one record from database and converts it to given struct. -// The parameter `pointer` should be a pointer to struct. -func (tx *TXCore) GetStruct(obj interface{}, sql string, args ...interface{}) error { - one, err := tx.GetOne(sql, args...) - if err != nil { - return err - } - return one.Struct(obj) -} - -// GetStructs queries records from database and converts them to given struct. -// The parameter `pointer` should be type of struct slice: []struct/[]*struct. -func (tx *TXCore) GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error { - all, err := tx.GetAll(sql, args...) - if err != nil { - return err - } - return all.Structs(objPointerSlice) -} - -// GetScan queries one or more records from database and converts them to given struct or -// struct array. -// -// If parameter `pointer` is type of struct pointer, it calls GetStruct internally for -// the conversion. If parameter `pointer` is type of slice, it calls GetStructs internally -// for conversion. -func (tx *TXCore) GetScan(pointer interface{}, sql string, args ...interface{}) error { - reflectInfo := reflection.OriginTypeAndKind(pointer) - if reflectInfo.InputKind != reflect.Ptr { - return gerror.NewCodef( - gcode.CodeInvalidParameter, - "params should be type of pointer, but got: %v", - reflectInfo.InputKind, - ) - } - switch reflectInfo.OriginKind { - case reflect.Array, reflect.Slice: - return tx.GetStructs(pointer, sql, args...) - - case reflect.Struct: - return tx.GetStruct(pointer, sql, args...) - } - return gerror.NewCodef( - gcode.CodeInvalidParameter, - `in valid parameter type "%v", of which element type should be type of struct/slice`, - reflectInfo.InputType, - ) -} - -// GetValue queries and returns the field value from database. -// The sql should query only one field from database, or else it returns only one -// field of the result. -func (tx *TXCore) GetValue(sql string, args ...interface{}) (Value, error) { - one, err := tx.GetOne(sql, args...) - if err != nil { - return nil, err - } - for _, v := range one { - return v, nil - } - return nil, nil -} - -// GetCount queries and returns the count from database. -func (tx *TXCore) GetCount(sql string, args ...interface{}) (int64, error) { - if !gregex.IsMatchString(`(?i)SELECT\s+COUNT\(.+\)\s+FROM`, sql) { - sql, _ = gregex.ReplaceString(`(?i)(SELECT)\s+(.+)\s+(FROM)`, `$1 COUNT($2) $3`, sql) - } - value, err := tx.GetValue(sql, args...) - if err != nil { - return 0, err - } - return value.Int64(), nil -} - -// Insert does "INSERT INTO ..." statement for the table. -// If there's already one unique record of the data in the table, it returns error. -// -// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. -// Eg: -// Data(g.Map{"uid": 10000, "name":"john"}) -// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) -// -// The parameter `batch` specifies the batch operation count when given data is slice. -func (tx *TXCore) Insert(table string, data interface{}, batch ...int) (sql.Result, error) { - if len(batch) > 0 { - return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Insert() - } - return tx.Model(table).Ctx(tx.ctx).Data(data).Insert() -} - -// InsertIgnore does "INSERT IGNORE INTO ..." statement for the table. -// If there's already one unique record of the data in the table, it ignores the inserting. -// -// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. -// Eg: -// Data(g.Map{"uid": 10000, "name":"john"}) -// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) -// -// The parameter `batch` specifies the batch operation count when given data is slice. -func (tx *TXCore) InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error) { - if len(batch) > 0 { - return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertIgnore() - } - return tx.Model(table).Ctx(tx.ctx).Data(data).InsertIgnore() -} - -// InsertAndGetId performs action Insert and returns the last insert id that automatically generated. -func (tx *TXCore) InsertAndGetId(table string, data interface{}, batch ...int) (int64, error) { - if len(batch) > 0 { - return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertAndGetId() - } - return tx.Model(table).Ctx(tx.ctx).Data(data).InsertAndGetId() -} - -// Replace does "REPLACE INTO ..." statement for the table. -// If there's already one unique record of the data in the table, it deletes the record -// and inserts a new one. -// -// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. -// Eg: -// Data(g.Map{"uid": 10000, "name":"john"}) -// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) -// -// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. -// If given data is type of slice, it then does batch replacing, and the optional parameter -// `batch` specifies the batch operation count. -func (tx *TXCore) Replace(table string, data interface{}, batch ...int) (sql.Result, error) { - if len(batch) > 0 { - return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Replace() - } - return tx.Model(table).Ctx(tx.ctx).Data(data).Replace() -} - -// Save does "INSERT INTO ... ON DUPLICATE KEY UPDATE..." statement for the table. -// It updates the record if there's primary or unique index in the saving data, -// or else it inserts a new record into the table. -// -// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. -// Eg: -// Data(g.Map{"uid": 10000, "name":"john"}) -// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) -// -// If given data is type of slice, it then does batch saving, and the optional parameter -// `batch` specifies the batch operation count. -func (tx *TXCore) Save(table string, data interface{}, batch ...int) (sql.Result, error) { - if len(batch) > 0 { - return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Save() - } - return tx.Model(table).Ctx(tx.ctx).Data(data).Save() -} - -// Update does "UPDATE ... " statement for the table. -// -// The parameter `data` can be type of string/map/gmap/struct/*struct, etc. -// Eg: "uid=10000", "uid", 10000, g.Map{"uid": 10000, "name":"john"} -// -// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc. -// It is commonly used with parameter `args`. -// Eg: -// "uid=10000", -// "uid", 10000 -// "money>? AND name like ?", 99999, "vip_%" -// "status IN (?)", g.Slice{1,2,3} -// "age IN(?,?)", 18, 50 -// User{ Id : 1, UserName : "john"}. -func (tx *TXCore) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) { - return tx.Model(table).Ctx(tx.ctx).Data(data).Where(condition, args...).Update() -} - -// Delete does "DELETE FROM ... " statement for the table. -// -// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc. -// It is commonly used with parameter `args`. -// Eg: -// "uid=10000", -// "uid", 10000 -// "money>? AND name like ?", 99999, "vip_%" -// "status IN (?)", g.Slice{1,2,3} -// "age IN(?,?)", 18, 50 -// User{ Id : 1, UserName : "john"}. -func (tx *TXCore) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) { - return tx.Model(table).Ctx(tx.ctx).Where(condition, args...).Delete() -} - -// QueryContext implements interface function Link.QueryContext. -func (tx *TXCore) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { - return tx.tx.QueryContext(ctx, sql, args...) -} - -// ExecContext implements interface function Link.ExecContext. -func (tx *TXCore) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { - return tx.tx.ExecContext(ctx, sql, args...) -} - -// PrepareContext implements interface function Link.PrepareContext. -func (tx *TXCore) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) { - return tx.tx.PrepareContext(ctx, sql) -} - -// IsOnMaster implements interface function Link.IsOnMaster. -func (tx *TXCore) IsOnMaster() bool { - return true -} - -// IsTransaction implements interface function Link.IsTransaction. -func (tx *TXCore) IsTransaction() bool { - return true -} diff --git a/database/gdb/gdb_core_txcore.go b/database/gdb/gdb_core_txcore.go new file mode 100644 index 000000000..187ea27cd --- /dev/null +++ b/database/gdb/gdb_core_txcore.go @@ -0,0 +1,412 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gdb + +import ( + "context" + "database/sql" + "reflect" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/internal/reflection" + "github.com/gogf/gf/v2/text/gregex" + "github.com/gogf/gf/v2/util/gconv" +) + +// TXCore is the struct for transaction management. +type TXCore struct { + db DB // db is the current gdb database manager. + tx *sql.Tx // tx is the raw and underlying transaction manager. + ctx context.Context // ctx is the context for this transaction only. + master *sql.DB // master is the raw and underlying database manager. + transactionId string // transactionId is a unique id generated by this object for this transaction. + transactionCount int // transactionCount marks the times that Begins. + isClosed bool // isClosed marks this transaction has already been committed or rolled back. +} + +// transactionKeyForNestedPoint forms and returns the transaction key at current save point. +func (tx *TXCore) transactionKeyForNestedPoint() string { + return tx.db.GetCore().QuoteWord( + transactionPointerPrefix + gconv.String(tx.transactionCount), + ) +} + +// Ctx sets the context for current transaction. +func (tx *TXCore) Ctx(ctx context.Context) TX { + tx.ctx = ctx + if tx.ctx != nil { + tx.ctx = tx.db.GetCore().injectInternalCtxData(tx.ctx) + } + return tx +} + +// GetCtx returns the context for current transaction. +func (tx *TXCore) GetCtx() context.Context { + return tx.ctx +} + +// GetDB returns the DB for current transaction. +func (tx *TXCore) GetDB() DB { + return tx.db +} + +// GetSqlTX returns the underlying transaction object for current transaction. +func (tx *TXCore) GetSqlTX() *sql.Tx { + return tx.tx +} + +// Commit commits current transaction. +// Note that it releases previous saved transaction point if it's in a nested transaction procedure, +// or else it commits the hole transaction. +func (tx *TXCore) Commit() error { + if tx.transactionCount > 0 { + tx.transactionCount-- + _, err := tx.Exec("RELEASE SAVEPOINT " + tx.transactionKeyForNestedPoint()) + return err + } + _, err := tx.db.DoCommit(tx.ctx, DoCommitInput{ + Tx: tx.tx, + Sql: "COMMIT", + Type: SqlTypeTXCommit, + IsTransaction: true, + }) + if err == nil { + tx.isClosed = true + } + return err +} + +// Rollback aborts current transaction. +// Note that it aborts current transaction if it's in a nested transaction procedure, +// or else it aborts the hole transaction. +func (tx *TXCore) Rollback() error { + if tx.transactionCount > 0 { + tx.transactionCount-- + _, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.transactionKeyForNestedPoint()) + return err + } + _, err := tx.db.DoCommit(tx.ctx, DoCommitInput{ + Tx: tx.tx, + Sql: "ROLLBACK", + Type: SqlTypeTXRollback, + IsTransaction: true, + }) + if err == nil { + tx.isClosed = true + } + return err +} + +// IsClosed checks and returns this transaction has already been committed or rolled back. +func (tx *TXCore) IsClosed() bool { + return tx.isClosed +} + +// Begin starts a nested transaction procedure. +func (tx *TXCore) Begin() error { + _, err := tx.Exec("SAVEPOINT " + tx.transactionKeyForNestedPoint()) + if err != nil { + return err + } + tx.transactionCount++ + return nil +} + +// SavePoint performs `SAVEPOINT xxx` SQL statement that saves transaction at current point. +// The parameter `point` specifies the point name that will be saved to server. +func (tx *TXCore) SavePoint(point string) error { + _, err := tx.Exec("SAVEPOINT " + tx.db.GetCore().QuoteWord(point)) + return err +} + +// RollbackTo performs `ROLLBACK TO SAVEPOINT xxx` SQL statement that rollbacks to specified saved transaction. +// The parameter `point` specifies the point name that was saved previously. +func (tx *TXCore) RollbackTo(point string) error { + _, err := tx.Exec("ROLLBACK TO SAVEPOINT " + tx.db.GetCore().QuoteWord(point)) + return err +} + +// Transaction wraps the transaction logic using function `f`. +// It rollbacks the transaction and returns the error from function `f` if +// it returns non-nil error. It commits the transaction and returns nil if +// function `f` returns nil. +// +// Note that, you should not Commit or Rollback the transaction in function `f` +// as it is automatically handled by this function. +func (tx *TXCore) Transaction(ctx context.Context, f func(ctx context.Context, tx TX) error) (err error) { + if ctx != nil { + tx.ctx = ctx + } + // Check transaction object from context. + if TXFromCtx(tx.ctx, tx.db.GetGroup()) == nil { + // Inject transaction object into context. + tx.ctx = WithTX(tx.ctx, tx) + } + if err = tx.Begin(); err != nil { + return err + } + err = callTxFunc(tx, f) + return +} + +// TransactionWithOptions wraps the transaction logic with propagation options using function `f`. +func (tx *TXCore) TransactionWithOptions( + ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error, +) (err error) { + return tx.db.TransactionWithOptions(ctx, opts, f) +} + +// Query does query operation on transaction. +// See Core.Query. +func (tx *TXCore) Query(sql string, args ...interface{}) (result Result, err error) { + return tx.db.DoQuery(tx.ctx, &txLink{tx.tx}, sql, args...) +} + +// Exec does none query operation on transaction. +// See Core.Exec. +func (tx *TXCore) Exec(sql string, args ...interface{}) (sql.Result, error) { + return tx.db.DoExec(tx.ctx, &txLink{tx.tx}, sql, args...) +} + +// Prepare creates a prepared statement for later queries or executions. +// Multiple queries or executions may be run concurrently from the +// returned statement. +// The caller must call the statement's Close method +// when the statement is no longer needed. +func (tx *TXCore) Prepare(sql string) (*Stmt, error) { + return tx.db.DoPrepare(tx.ctx, &txLink{tx.tx}, sql) +} + +// GetAll queries and returns data records from database. +func (tx *TXCore) GetAll(sql string, args ...interface{}) (Result, error) { + return tx.Query(sql, args...) +} + +// GetOne queries and returns one record from database. +func (tx *TXCore) GetOne(sql string, args ...interface{}) (Record, error) { + list, err := tx.GetAll(sql, args...) + if err != nil { + return nil, err + } + if len(list) > 0 { + return list[0], nil + } + return nil, nil +} + +// GetStruct queries one record from database and converts it to given struct. +// The parameter `pointer` should be a pointer to struct. +func (tx *TXCore) GetStruct(obj interface{}, sql string, args ...interface{}) error { + one, err := tx.GetOne(sql, args...) + if err != nil { + return err + } + return one.Struct(obj) +} + +// GetStructs queries records from database and converts them to given struct. +// The parameter `pointer` should be type of struct slice: []struct/[]*struct. +func (tx *TXCore) GetStructs(objPointerSlice interface{}, sql string, args ...interface{}) error { + all, err := tx.GetAll(sql, args...) + if err != nil { + return err + } + return all.Structs(objPointerSlice) +} + +// GetScan queries one or more records from database and converts them to given struct or +// struct array. +// +// If parameter `pointer` is type of struct pointer, it calls GetStruct internally for +// the conversion. If parameter `pointer` is type of slice, it calls GetStructs internally +// for conversion. +func (tx *TXCore) GetScan(pointer interface{}, sql string, args ...interface{}) error { + reflectInfo := reflection.OriginTypeAndKind(pointer) + if reflectInfo.InputKind != reflect.Ptr { + return gerror.NewCodef( + gcode.CodeInvalidParameter, + "params should be type of pointer, but got: %v", + reflectInfo.InputKind, + ) + } + switch reflectInfo.OriginKind { + case reflect.Array, reflect.Slice: + return tx.GetStructs(pointer, sql, args...) + + case reflect.Struct: + return tx.GetStruct(pointer, sql, args...) + + default: + } + return gerror.NewCodef( + gcode.CodeInvalidParameter, + `in valid parameter type "%v", of which element type should be type of struct/slice`, + reflectInfo.InputType, + ) +} + +// GetValue queries and returns the field value from database. +// The sql should query only one field from database, or else it returns only one +// field of the result. +func (tx *TXCore) GetValue(sql string, args ...interface{}) (Value, error) { + one, err := tx.GetOne(sql, args...) + if err != nil { + return nil, err + } + for _, v := range one { + return v, nil + } + return nil, nil +} + +// GetCount queries and returns the count from database. +func (tx *TXCore) GetCount(sql string, args ...interface{}) (int64, error) { + if !gregex.IsMatchString(`(?i)SELECT\s+COUNT\(.+\)\s+FROM`, sql) { + sql, _ = gregex.ReplaceString(`(?i)(SELECT)\s+(.+)\s+(FROM)`, `$1 COUNT($2) $3`, sql) + } + value, err := tx.GetValue(sql, args...) + if err != nil { + return 0, err + } + return value.Int64(), nil +} + +// Insert does "INSERT INTO ..." statement for the table. +// If there's already one unique record of the data in the table, it returns error. +// +// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. +// Eg: +// Data(g.Map{"uid": 10000, "name":"john"}) +// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) +// +// The parameter `batch` specifies the batch operation count when given data is slice. +func (tx *TXCore) Insert(table string, data interface{}, batch ...int) (sql.Result, error) { + if len(batch) > 0 { + return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Insert() + } + return tx.Model(table).Ctx(tx.ctx).Data(data).Insert() +} + +// InsertIgnore does "INSERT IGNORE INTO ..." statement for the table. +// If there's already one unique record of the data in the table, it ignores the inserting. +// +// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. +// Eg: +// Data(g.Map{"uid": 10000, "name":"john"}) +// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) +// +// The parameter `batch` specifies the batch operation count when given data is slice. +func (tx *TXCore) InsertIgnore(table string, data interface{}, batch ...int) (sql.Result, error) { + if len(batch) > 0 { + return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertIgnore() + } + return tx.Model(table).Ctx(tx.ctx).Data(data).InsertIgnore() +} + +// InsertAndGetId performs action Insert and returns the last insert id that automatically generated. +func (tx *TXCore) InsertAndGetId(table string, data interface{}, batch ...int) (int64, error) { + if len(batch) > 0 { + return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).InsertAndGetId() + } + return tx.Model(table).Ctx(tx.ctx).Data(data).InsertAndGetId() +} + +// Replace does "REPLACE INTO ..." statement for the table. +// If there's already one unique record of the data in the table, it deletes the record +// and inserts a new one. +// +// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. +// Eg: +// Data(g.Map{"uid": 10000, "name":"john"}) +// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) +// +// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. +// If given data is type of slice, it then does batch replacing, and the optional parameter +// `batch` specifies the batch operation count. +func (tx *TXCore) Replace(table string, data interface{}, batch ...int) (sql.Result, error) { + if len(batch) > 0 { + return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Replace() + } + return tx.Model(table).Ctx(tx.ctx).Data(data).Replace() +} + +// Save does "INSERT INTO ... ON DUPLICATE KEY UPDATE..." statement for the table. +// It updates the record if there's primary or unique index in the saving data, +// or else it inserts a new record into the table. +// +// The parameter `data` can be type of map/gmap/struct/*struct/[]map/[]struct, etc. +// Eg: +// Data(g.Map{"uid": 10000, "name":"john"}) +// Data(g.Slice{g.Map{"uid": 10000, "name":"john"}, g.Map{"uid": 20000, "name":"smith"}) +// +// If given data is type of slice, it then does batch saving, and the optional parameter +// `batch` specifies the batch operation count. +func (tx *TXCore) Save(table string, data interface{}, batch ...int) (sql.Result, error) { + if len(batch) > 0 { + return tx.Model(table).Ctx(tx.ctx).Data(data).Batch(batch[0]).Save() + } + return tx.Model(table).Ctx(tx.ctx).Data(data).Save() +} + +// Update does "UPDATE ... " statement for the table. +// +// The parameter `data` can be type of string/map/gmap/struct/*struct, etc. +// Eg: "uid=10000", "uid", 10000, g.Map{"uid": 10000, "name":"john"} +// +// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc. +// It is commonly used with parameter `args`. +// Eg: +// "uid=10000", +// "uid", 10000 +// "money>? AND name like ?", 99999, "vip_%" +// "status IN (?)", g.Slice{1,2,3} +// "age IN(?,?)", 18, 50 +// User{ Id : 1, UserName : "john"}. +func (tx *TXCore) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) { + return tx.Model(table).Ctx(tx.ctx).Data(data).Where(condition, args...).Update() +} + +// Delete does "DELETE FROM ... " statement for the table. +// +// The parameter `condition` can be type of string/map/gmap/slice/struct/*struct, etc. +// It is commonly used with parameter `args`. +// Eg: +// "uid=10000", +// "uid", 10000 +// "money>? AND name like ?", 99999, "vip_%" +// "status IN (?)", g.Slice{1,2,3} +// "age IN(?,?)", 18, 50 +// User{ Id : 1, UserName : "john"}. +func (tx *TXCore) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) { + return tx.Model(table).Ctx(tx.ctx).Where(condition, args...).Delete() +} + +// QueryContext implements interface function Link.QueryContext. +func (tx *TXCore) QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { + return tx.tx.QueryContext(ctx, sql, args...) +} + +// ExecContext implements interface function Link.ExecContext. +func (tx *TXCore) ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error) { + return tx.tx.ExecContext(ctx, sql, args...) +} + +// PrepareContext implements interface function Link.PrepareContext. +func (tx *TXCore) PrepareContext(ctx context.Context, sql string) (*sql.Stmt, error) { + return tx.tx.PrepareContext(ctx, sql) +} + +// IsOnMaster implements interface function Link.IsOnMaster. +func (tx *TXCore) IsOnMaster() bool { + return true +} + +// IsTransaction implements interface function Link.IsTransaction. +func (tx *TXCore) IsTransaction() bool { + return true +} diff --git a/database/gdb/gdb_core_underlying.go b/database/gdb/gdb_core_underlying.go index dbdc59eb3..166f5785b 100644 --- a/database/gdb/gdb_core_underlying.go +++ b/database/gdb/gdb_core_underlying.go @@ -187,7 +187,13 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp // Execution cased by type. switch in.Type { case SqlTypeBegin: - if sqlTx, err = in.Db.Begin(); err == nil { + ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeTrans) + defer cancelFuncForTimeout() + formattedSql = fmt.Sprintf( + `%s (IosolationLevel: %s, ReadOnly: %t)`, + formattedSql, in.TxOptions.Isolation.String(), in.TxOptions.ReadOnly, + ) + if sqlTx, err = in.Db.BeginTx(ctx, &in.TxOptions); err == nil { out.Tx = &TXCore{ db: c.db, tx: sqlTx, @@ -206,6 +212,8 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp err = in.Tx.Rollback() case SqlTypeExecContext: + ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeExec) + defer cancelFuncForTimeout() if c.db.GetDryRun() { sqlResult = new(SqlResult) } else { @@ -214,10 +222,14 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp out.RawResult = sqlResult case SqlTypeQueryContext: + ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeQuery) + defer cancelFuncForTimeout() sqlRows, err = in.Link.QueryContext(ctx, in.Sql, in.Args...) out.RawResult = sqlRows case SqlTypePrepareContext: + ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypePrepare) + defer cancelFuncForTimeout() sqlStmt, err = in.Link.PrepareContext(ctx, in.Sql) out.RawResult = sqlStmt diff --git a/database/gdb/gdb_model_transaction.go b/database/gdb/gdb_model_transaction.go index e186db37f..c4b577ec9 100644 --- a/database/gdb/gdb_model_transaction.go +++ b/database/gdb/gdb_model_transaction.go @@ -26,3 +26,17 @@ func (m *Model) Transaction(ctx context.Context, f func(ctx context.Context, tx } return m.db.Transaction(ctx, f) } + +// TransactionWithOptions executes transaction with options. +// The parameter `opts` specifies the transaction options. +// The parameter `f` specifies the function that will be called within the transaction. +// If f returns error, the transaction will be rolled back, or else the transaction will be committed. +func (m *Model) TransactionWithOptions(ctx context.Context, opts TxOptions, f func(ctx context.Context, tx TX) error) (err error) { + if ctx == nil { + ctx = m.GetCtx() + } + if m.tx != nil { + return m.tx.Transaction(ctx, f) + } + return m.db.TransactionWithOptions(ctx, opts, f) +}