mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
完善gdb包事务操作
This commit is contained in:
@ -49,7 +49,7 @@ type Link interface {
|
||||
SetMaxOpenConns(n int)
|
||||
|
||||
// 开启事务操作
|
||||
Begin() (*sql.Tx, error)
|
||||
Begin() (*Tx, error)
|
||||
|
||||
// 数据表插入/更新/保存操作
|
||||
Insert(table string, data Map) (sql.Result, error)
|
||||
|
||||
@ -12,7 +12,6 @@ import (
|
||||
"errors"
|
||||
"strings"
|
||||
"database/sql"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
// 关闭链接
|
||||
@ -22,7 +21,6 @@ func (db *Db) Close() error {
|
||||
if err == nil {
|
||||
db.master = nil
|
||||
} else {
|
||||
glog.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -31,7 +29,6 @@ func (db *Db) Close() error {
|
||||
if err == nil {
|
||||
db.slave = nil
|
||||
} else {
|
||||
glog.Fatal(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -156,8 +153,15 @@ func (db *Db) SetMaxOpenConns(n int) {
|
||||
}
|
||||
|
||||
// 事务操作,开启,会返回一个底层的事务操作对象链接如需要嵌套事务,那么可以使用该对象,否则请忽略
|
||||
func (db *Db) Begin() (*sql.Tx, error) {
|
||||
return db.master.Begin()
|
||||
func (db *Db) Begin() (*Tx, error) {
|
||||
if tx, err := db.master.Begin(); err == nil {
|
||||
return &Tx{
|
||||
db : db,
|
||||
tx : tx,
|
||||
}, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 根据insert选项获得操作名称
|
||||
|
||||
@ -15,7 +15,8 @@ import (
|
||||
|
||||
// 数据库链式操作对象
|
||||
type DbOp struct {
|
||||
link Link // 数据库链接对象
|
||||
tx *Tx // 数据库事务对象
|
||||
db *Db // 数据库操作对象
|
||||
tables string // 数据库操作表
|
||||
fields string // 操作字段
|
||||
where string // 操作条件
|
||||
@ -31,8 +32,8 @@ type DbOp struct {
|
||||
// 链式操作,数据表字段,可支持多个表,以半角逗号连接
|
||||
func (db *Db) Table(tables string) (*DbOp) {
|
||||
return &DbOp {
|
||||
link : db.link,
|
||||
tables: tables,
|
||||
db : db,
|
||||
tables : tables,
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,6 +42,19 @@ func (db *Db) From(tables string) (*DbOp) {
|
||||
return db.Table(tables)
|
||||
}
|
||||
|
||||
// (事务)链式操作,数据表字段,可支持多个表,以半角逗号连接
|
||||
func (tx *Tx) Table(tables string) (*DbOp) {
|
||||
return &DbOp {
|
||||
tx : tx,
|
||||
tables : tables,
|
||||
}
|
||||
}
|
||||
|
||||
// (事务)链式操作,数据表字段,可支持多个表,以半角逗号连接
|
||||
func (tx *Tx) From(tables string) (*DbOp) {
|
||||
return tx.Table(tables)
|
||||
}
|
||||
|
||||
// 链式操作,左联表
|
||||
func (op *DbOp) LeftJoin(joinTable string, on string) (*DbOp) {
|
||||
op.tables += fmt.Sprintf(" LEFT JOIN %s ON (%s)", joinTable, on)
|
||||
@ -105,14 +119,22 @@ func (op *DbOp) Insert() (sql.Result, error) {
|
||||
if op.batch > 0 {
|
||||
batch = op.batch
|
||||
}
|
||||
return op.link.BatchInsert(op.tables, list, batch)
|
||||
if op.tx == nil {
|
||||
return op.db.BatchInsert(op.tables, list, batch)
|
||||
} else {
|
||||
return op.tx.BatchInsert(op.tables, list, batch)
|
||||
}
|
||||
}
|
||||
// 记录操作
|
||||
if op.data == nil {
|
||||
return nil, errors.New("inserting into table with empty data")
|
||||
}
|
||||
if d, ok := op.data.(Map); ok {
|
||||
return op.link.Insert(op.tables, d)
|
||||
if dataMap, ok := op.data.(Map); ok {
|
||||
if op.tx == nil {
|
||||
return op.db.Insert(op.tables, dataMap)
|
||||
} else {
|
||||
return op.tx.Insert(op.tables, dataMap)
|
||||
}
|
||||
}
|
||||
return nil, errors.New("inserting into table with invalid data type")
|
||||
}
|
||||
@ -125,14 +147,22 @@ func (op *DbOp) Replace() (sql.Result, error) {
|
||||
if op.batch > 0 {
|
||||
batch = op.batch
|
||||
}
|
||||
return op.link.BatchReplace(op.tables, list, batch)
|
||||
if op.tx == nil {
|
||||
return op.db.BatchReplace(op.tables, list, batch)
|
||||
} else {
|
||||
return op.tx.BatchReplace(op.tables, list, batch)
|
||||
}
|
||||
}
|
||||
// 记录操作
|
||||
if op.data == nil {
|
||||
return nil, errors.New("replacing into table with empty data")
|
||||
}
|
||||
if d, ok := op.data.(Map); ok {
|
||||
return op.link.Insert(op.tables, d)
|
||||
if dataMap, ok := op.data.(Map); ok {
|
||||
if op.tx == nil {
|
||||
return op.db.Insert(op.tables, dataMap)
|
||||
} else {
|
||||
return op.tx.Insert(op.tables, dataMap)
|
||||
}
|
||||
}
|
||||
return nil, errors.New("replacing into table with invalid data type")
|
||||
}
|
||||
@ -145,14 +175,22 @@ func (op *DbOp) Save() (sql.Result, error) {
|
||||
if op.batch > 0 {
|
||||
batch = op.batch
|
||||
}
|
||||
return op.link.BatchSave(op.tables, list, batch)
|
||||
if op.tx == nil {
|
||||
return op.db.BatchSave(op.tables, list, batch)
|
||||
} else {
|
||||
return op.tx.BatchSave(op.tables, list, batch)
|
||||
}
|
||||
}
|
||||
// 记录操作
|
||||
if op.data == nil {
|
||||
return nil, errors.New("saving into table with empty data")
|
||||
}
|
||||
if d, ok := op.data.(Map); ok {
|
||||
return op.link.Save(op.tables, d)
|
||||
if dataMap, ok := op.data.(Map); ok {
|
||||
if op.tx == nil {
|
||||
return op.db.Save(op.tables, dataMap)
|
||||
} else {
|
||||
return op.tx.Save(op.tables, dataMap)
|
||||
}
|
||||
}
|
||||
return nil, errors.New("saving into table with invalid data type")
|
||||
}
|
||||
@ -162,7 +200,11 @@ func (op *DbOp) Update() (sql.Result, error) {
|
||||
if op.data == nil {
|
||||
return nil, errors.New("updating table with empty data")
|
||||
}
|
||||
return op.link.Update(op.tables, op.data, op.where, op.whereArgs ...)
|
||||
if op.tx == nil {
|
||||
return op.db.Update(op.tables, op.data, op.where, op.whereArgs ...)
|
||||
} else {
|
||||
return op.tx.Update(op.tables, op.data, op.where, op.whereArgs ...)
|
||||
}
|
||||
}
|
||||
|
||||
// 链式操作, CURD - Delete
|
||||
@ -170,7 +212,11 @@ func (op *DbOp) Delete() (sql.Result, error) {
|
||||
if op.where == "" {
|
||||
return nil, errors.New("where is required while deleting")
|
||||
}
|
||||
return op.link.Delete(op.tables, op.where, op.whereArgs...)
|
||||
if op.tx == nil {
|
||||
return op.db.Delete(op.tables, op.where, op.whereArgs...)
|
||||
} else {
|
||||
return op.tx.Delete(op.tables, op.where, op.whereArgs...)
|
||||
}
|
||||
}
|
||||
|
||||
// 设置批处理的大小
|
||||
@ -197,7 +243,11 @@ func (op *DbOp) Select() (List, error) {
|
||||
if op.limit != 0 {
|
||||
s += fmt.Sprintf(" LIMIT %d, %d", op.start, op.limit)
|
||||
}
|
||||
return op.link.GetAll(s, op.whereArgs...)
|
||||
if op.tx == nil {
|
||||
return op.db.GetAll(s, op.whereArgs...)
|
||||
} else {
|
||||
return op.tx.GetAll(s, op.whereArgs...)
|
||||
}
|
||||
}
|
||||
|
||||
// 链式操作,查询所有记录
|
||||
|
||||
@ -9,42 +9,26 @@ package gdb
|
||||
import (
|
||||
"fmt"
|
||||
"errors"
|
||||
"strings"
|
||||
"database/sql"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gcache"
|
||||
"gitee.com/johng/gf/g/util/grand"
|
||||
_ "github.com/lib/pq"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// 数据库事务对象
|
||||
type Tx struct {
|
||||
db Db
|
||||
db *Db
|
||||
tx *sql.Tx
|
||||
}
|
||||
|
||||
//Query(q string, args ...interface{}) (*sql.Rows, error)
|
||||
//Exec(q string, args ...interface{}) (sql.Result, error)
|
||||
//Prepare(q string) (*sql.Stmt, error)
|
||||
//
|
||||
//GetAll(q string, args ...interface{}) (List, error)
|
||||
//GetOne(q string, args ...interface{}) (Map, error)
|
||||
//GetValue(q string, args ...interface{}) (interface{}, error)
|
||||
//
|
||||
//
|
||||
//Insert(table string, data Map) (sql.Result, error)
|
||||
//Replace(table string, data Map) (sql.Result, error)
|
||||
//Save(table string, data Map) (sql.Result, error)
|
||||
//
|
||||
//BatchInsert(table string, list List, batch int) (sql.Result, error)
|
||||
//BatchReplace(table string, list List, batch int) (sql.Result, error)
|
||||
//BatchSave(table string, list List, batch int) (sql.Result, error)
|
||||
//
|
||||
//Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
|
||||
//Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error)
|
||||
//
|
||||
//Table(tables string) (*gLinkOp)
|
||||
//From(tables string) (*gLinkOp)
|
||||
// 事务操作,提交
|
||||
func (tx *Tx) Commit() error {
|
||||
return tx.tx.Commit()
|
||||
}
|
||||
|
||||
// 事务操作,回滚
|
||||
func (tx *Tx) Rollback() error {
|
||||
return tx.tx.Rollback()
|
||||
}
|
||||
|
||||
// 数据库sql查询操作,主要执行查询
|
||||
@ -145,29 +129,29 @@ func (tx *Tx) insert(table string, data Map, option uint8) (sql.Result, error) {
|
||||
if option == OPTION_SAVE {
|
||||
var updates []string
|
||||
for k, _ := range data {
|
||||
updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", db.charl, k, db.charr, k))
|
||||
updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", tx.db.charl, k, tx.db.charr, k))
|
||||
}
|
||||
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
|
||||
}
|
||||
return tx.Exec(
|
||||
fmt.Sprintf("%s INTO %s%s%s(%s) VALUES(%s) %s",
|
||||
operation, tx.db.charl, table, db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params...
|
||||
operation, tx.db.charl, table, tx.db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params...
|
||||
)
|
||||
}
|
||||
|
||||
// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
|
||||
func (tx *Tx) Insert(table string, data Map) (sql.Result, error) {
|
||||
return db.link.insert(table, data, OPTION_INSERT)
|
||||
return tx.insert(table, data, OPTION_INSERT)
|
||||
}
|
||||
|
||||
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
|
||||
func (tx *Tx) Replace(table string, data Map) (sql.Result, error) {
|
||||
return db.link.insert(table, data, OPTION_REPLACE)
|
||||
return tx.insert(table, data, OPTION_REPLACE)
|
||||
}
|
||||
|
||||
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
|
||||
func (tx *Tx) Save(table string, data Map) (sql.Result, error) {
|
||||
return db.link.insert(table, data, OPTION_SAVE)
|
||||
return tx.insert(table, data, OPTION_SAVE)
|
||||
}
|
||||
|
||||
// 批量写入数据
|
||||
@ -187,14 +171,14 @@ func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql
|
||||
keys = append(keys, k)
|
||||
values = append(values, "?")
|
||||
}
|
||||
var kstr = db.charl + strings.Join(keys, db.charl + "," + db.charr) + db.charr
|
||||
var kstr = tx.db.charl + strings.Join(keys, tx.db.charl + "," + tx.db.charr) + tx.db.charr
|
||||
// 操作判断
|
||||
operation := db.getInsertOperationByOption(option)
|
||||
operation := tx.db.getInsertOperationByOption(option)
|
||||
updatestr := ""
|
||||
if option == OPTION_SAVE {
|
||||
var updates []string
|
||||
for _, k := range keys {
|
||||
updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", db.charl, k, db.charr, k))
|
||||
updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", tx.db.charl, k, tx.db.charr, k))
|
||||
}
|
||||
updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
|
||||
}
|
||||
@ -205,7 +189,9 @@ func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql
|
||||
}
|
||||
bvalues = append(bvalues, "(" + strings.Join(values, ",") + ")")
|
||||
if len(bvalues) == batch {
|
||||
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
|
||||
r, err := tx.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s",
|
||||
operation, tx.db.charl, table, tx.db.charr, kstr, strings.Join(bvalues, ","), updatestr),
|
||||
params...)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
@ -215,7 +201,9 @@ func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql
|
||||
}
|
||||
// 处理最后不构成指定批量的数据
|
||||
if len(bvalues) > 0 {
|
||||
r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...)
|
||||
r, err := tx.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s",
|
||||
operation, tx.db.charl, table, tx.db.charr, kstr, strings.Join(bvalues, ","), updatestr),
|
||||
params...)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
@ -226,17 +214,17 @@ func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql
|
||||
|
||||
// CURD操作:批量数据指定批次量写入
|
||||
func (tx *Tx) BatchInsert(table string, list List, batch int) (sql.Result, error) {
|
||||
return db.link.batchInsert(table, list, batch, OPTION_INSERT)
|
||||
return tx.batchInsert(table, list, batch, OPTION_INSERT)
|
||||
}
|
||||
|
||||
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
|
||||
func (tx *Tx) BatchReplace(table string, list List, batch int) (sql.Result, error) {
|
||||
return db.link.batchInsert(table, list, batch, OPTION_REPLACE)
|
||||
return tx.batchInsert(table, list, batch, OPTION_REPLACE)
|
||||
}
|
||||
|
||||
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
|
||||
func (tx *Tx) BatchSave(table string, list List, batch int) (sql.Result, error) {
|
||||
return db.link.batchInsert(table, list, batch, OPTION_SAVE)
|
||||
return tx.batchInsert(table, list, batch, OPTION_SAVE)
|
||||
}
|
||||
|
||||
// CURD操作:数据更新,统一采用sql预处理
|
||||
@ -245,18 +233,18 @@ func (tx *Tx) Update(table string, data interface{}, condition interface{}, args
|
||||
var params []interface{}
|
||||
var updates string
|
||||
switch data.(type) {
|
||||
case string:
|
||||
updates = data.(string)
|
||||
case Map:
|
||||
var keys []string
|
||||
for k, v := range data.(Map) {
|
||||
keys = append(keys, fmt.Sprintf("%s%s%s=?", db.charl, k, db.charr))
|
||||
params = append(params, v)
|
||||
}
|
||||
updates = strings.Join(keys, ",")
|
||||
case string:
|
||||
updates = data.(string)
|
||||
case Map:
|
||||
var keys []string
|
||||
for k, v := range data.(Map) {
|
||||
keys = append(keys, fmt.Sprintf("%s%s%s=?", tx.db.charl, k, tx.db.charr))
|
||||
params = append(params, v)
|
||||
}
|
||||
updates = strings.Join(keys, ",")
|
||||
|
||||
default:
|
||||
return nil, errors.New("invalid data type for 'data' field, string or Map expected")
|
||||
default:
|
||||
return nil, errors.New("invalid data type for 'data' field, string or Map expected")
|
||||
}
|
||||
for _, v := range args {
|
||||
if r, ok := v.(string); ok {
|
||||
@ -267,10 +255,10 @@ func (tx *Tx) Update(table string, data interface{}, condition interface{}, args
|
||||
|
||||
}
|
||||
}
|
||||
return db.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", db.charl, table, db.charr, updates, condition), params...)
|
||||
return tx.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", tx.db.charl, table, tx.db.charr, updates, condition), params...)
|
||||
}
|
||||
|
||||
// CURD操作:删除数据
|
||||
func (tx *Tx) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) {
|
||||
return db.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", db.charl, table, db.charr, condition), args...)
|
||||
return tx.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", tx.db.charl, table, tx.db.charr, condition), args...)
|
||||
}
|
||||
|
||||
@ -377,7 +377,7 @@ func linkopBatchSave() {
|
||||
func transaction1() {
|
||||
fmt.Println("transaction1:")
|
||||
if tx, err := db.Begin(); err == nil {
|
||||
r, err := db.Save("user", gdb.Map{
|
||||
r, err := tx.Save("user", gdb.Map{
|
||||
"uid" : 1,
|
||||
"name" : "john",
|
||||
})
|
||||
@ -391,7 +391,7 @@ func transaction1() {
|
||||
func transaction2() {
|
||||
fmt.Println("transaction2:")
|
||||
if tx, err := db.Begin(); err == nil {
|
||||
r, err := db.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save()
|
||||
r, err := tx.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save()
|
||||
tx.Commit()
|
||||
fmt.Println(r, err)
|
||||
}
|
||||
@ -455,6 +455,6 @@ func main() {
|
||||
//linkopUpdate2()
|
||||
//linkopUpdate3()
|
||||
//keepPing()
|
||||
transaction1()
|
||||
//transaction2()
|
||||
//transaction1()
|
||||
transaction2()
|
||||
}
|
||||
Reference in New Issue
Block a user