gdb updates, add batch operation support for Insert/Save/Replace, change list param type from List to interface{} for Batch* functions

This commit is contained in:
John
2019-02-27 09:38:10 +08:00
parent 9afe242293
commit ef34b2c9ce
5 changed files with 118 additions and 67 deletions

View File

@ -37,8 +37,8 @@ type DB interface {
doQuery(link dbLink, query string, args ...interface{}) (rows *sql.Rows, err error)
doExec(link dbLink, query string, args ...interface{}) (result sql.Result, err error)
doPrepare(link dbLink, query string) (*sql.Stmt, error)
doInsert(link dbLink, table string, data interface{}, option int) (result sql.Result, err error)
doBatchInsert(link dbLink, table string, list List, batch int, option int) (result sql.Result, err error)
doInsert(link dbLink, table string, data interface{}, option int, batch...int) (result sql.Result, err error)
doBatchInsert(link dbLink, table string, list interface{}, option int, batch...int) (result sql.Result, err error)
doUpdate(link dbLink, table string, data interface{}, condition interface{}, args ...interface{}) (result sql.Result, err error)
doDelete(link dbLink, table string, condition interface{}, args ...interface{}) (result sql.Result, err error)
@ -61,14 +61,14 @@ type DB interface {
Begin() (*TX, error)
// 数据表插入/更新/保存操作
Insert(table string, data interface{}) (sql.Result, error)
Replace(table string, data interface{}) (sql.Result, error)
Save(table string, data interface{}) (sql.Result, error)
Insert(table string, data interface{}, batch...int) (sql.Result, error)
Replace(table string, data interface{}, batch...int) (sql.Result, error)
Save(table string, data interface{}, batch...int) (sql.Result, error)
// 数据表插入/更新/保存操作(批量)
BatchInsert(table string, list List, batch int) (sql.Result, error)
BatchReplace(table string, list List, batch int) (sql.Result, error)
BatchSave(table string, list List, batch int) (sql.Result, error)
BatchInsert(table string, list interface{}, batch...int) (sql.Result, error)
BatchReplace(table string, list interface{}, batch...int) (sql.Result, error)
BatchSave(table string, list interface{}, batch...int) (sql.Result, error)
// 数据修改/删除
Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error)
@ -149,8 +149,11 @@ const (
OPTION_REPLACE = 1
OPTION_SAVE = 2
OPTION_IGNORE = 3
// 默认批量操作的数量值(Batch*操作)
gDEFAULT_BATCH_NUM = 10
// 默认的连接池连接存活时间(秒)
gDEFAULT_CONN_MAX_LIFE_TIME = 30
)
// 使用默认/指定分组配置进行连接数据库集群配置项default

View File

@ -235,35 +235,53 @@ func (bs *dbBase) Begin() (*TX, error) {
}
// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
func (bs *dbBase) Insert(table string, data interface{}) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_INSERT)
func (bs *dbBase) Insert(table string, data interface{}, batch...int) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_INSERT, batch...)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (bs *dbBase) Replace(table string, data interface{}) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_REPLACE)
func (bs *dbBase) Replace(table string, data interface{}, batch...int) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_REPLACE, batch...)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (bs *dbBase) Save(table string, data interface{}) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_SAVE)
func (bs *dbBase) Save(table string, data interface{}, batch...int) (sql.Result, error) {
return bs.db.doInsert(nil, table, data, OPTION_SAVE, batch...)
}
// insert、replace, save ignore操作。
// 支持insert、replace, save ignore操作。
// 0: insert: 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回;
// 1: replace: 如果数据存在(主键或者唯一索引),那么删除后重新写入一条;
// 2: save: 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据;
// 3: ignore: 如果数据存在(主键或者唯一索引),那么什么也不做;
//
// 参数data支持任意map类型,或者struct类型
func (bs *dbBase) doInsert(link dbLink, table string, data interface{}, option int) (result sql.Result, err error) {
var fields []string
var values []string
var params []interface{}
// 参数data支持map/struct/*struct/slice类型
// 当为slice(例如[]map/[]struct/[]*struct)类型时batch参数生效并自动切换为批量操作。
func (bs *dbBase) doInsert(link dbLink, table string, data interface{}, option int, batch...int) (result sql.Result, err error) {
var fields []string
var values []string
var params []interface{}
var dataMap Map
// 使用反射判断data数据类型如果为slice类型那么自动转为批量操作
rv := reflect.ValueOf(data)
kind := rv.Kind()
if kind == reflect.Ptr {
rv = rv.Elem()
kind = rv.Kind()
}
switch kind {
case reflect.Slice: fallthrough
case reflect.Array:
return bs.db.doBatchInsert(link, table, data, option, batch...)
case reflect.Map: fallthrough
case reflect.Struct:
dataMap = Map(gconv.Map(data))
default:
return result, errors.New(fmt.Sprint("unsupported data type:", kind))
}
charL, charR := bs.db.getChars()
dataMap := gconv.Map(data)
for k, v := range dataMap {
fields = append(fields, charL + k + charR)
fields = append(fields, charL + k + charR)
values = append(values, "?")
params = append(params, v)
}
@ -293,28 +311,55 @@ func (bs *dbBase) doInsert(link dbLink, table string, data interface{}, option i
}
// CURD操作:批量数据指定批次量写入
func (bs *dbBase) BatchInsert(table string, list List, batch int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, batch, OPTION_INSERT)
func (bs *dbBase) BatchInsert(table string, list interface{}, batch...int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, OPTION_INSERT, batch...)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (bs *dbBase) BatchReplace(table string, list List, batch int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, batch, OPTION_REPLACE)
func (bs *dbBase) BatchReplace(table string, list interface{}, batch...int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, OPTION_REPLACE, batch...)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (bs *dbBase) BatchSave(table string, list List, batch int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, batch, OPTION_SAVE)
func (bs *dbBase) BatchSave(table string, list interface{}, batch...int) (sql.Result, error) {
return bs.db.doBatchInsert(nil, table, list, OPTION_SAVE, batch...)
}
// 批量写入数据
func (bs *dbBase) doBatchInsert(link dbLink, table string, list List, batch int, option int) (result sql.Result, err error) {
var keys []string
var values []string
var bvalues []string
var params []interface{}
// 批量写入数据, 参数list支持slice类型例如: []map/[]struct/[]*struct。
func (bs *dbBase) doBatchInsert(link dbLink, table string, list interface{}, option int, batch...int) (result sql.Result, err error) {
var keys []string
var values []string
var params []interface{}
listMap := (List)(nil)
switch v := list.(type) {
case List:
listMap = v
case Map:
listMap = List{v}
default:
rv := reflect.ValueOf(list)
kind := rv.Kind()
if kind == reflect.Ptr {
rv = rv.Elem()
kind = rv.Kind()
}
switch kind {
// 如果是slice那么转换为List类型
case reflect.Slice: fallthrough
case reflect.Array:
listMap = make(List, rv.Len())
for i := 0; i < rv.Len(); i++ {
listMap[i] = gconv.Map(rv.Index(i).Interface())
}
case reflect.Map: fallthrough
case reflect.Struct:
listMap = List{Map(gconv.Map(list))}
default:
return result, errors.New(fmt.Sprint("unsupported list type:", kind))
}
}
// 判断长度
if len(list) < 1 {
if len(listMap) < 1 {
return result, errors.New("empty data list")
}
if link == nil {
@ -323,14 +368,15 @@ func (bs *dbBase) doBatchInsert(link dbLink, table string, list List, batch int,
}
}
// 首先获取字段名称及记录长度
for k, _ := range list[0] {
keys = append(keys, k)
values = append(values, "?")
holders := []string(nil)
for k, _ := range listMap[0] {
keys = append(keys, k)
holders = append(holders, "?")
}
batchResult := new(batchSqlResult)
charL, charR := bs.db.getChars()
keyStr := charL + strings.Join(keys, charL + "," + charR) + charR
valueHolderStr := "(" + strings.Join(values, ",") + ")"
valueHolderStr := "(" + strings.Join(holders, ",") + ")"
// 操作判断
operation := getInsertOperationByOption(option)
updateStr := ""
@ -347,14 +393,18 @@ func (bs *dbBase) doBatchInsert(link dbLink, table string, list List, batch int,
updateStr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ","))
}
// 构造批量写入数据格式(注意map的遍历是无序的)
for i := 0; i < len(list); i++ {
batchNum := gDEFAULT_BATCH_NUM
if len(batch) > 0 {
batchNum = batch[0]
}
for i := 0; i < len(listMap); i++ {
for _, k := range keys {
params = append(params, list[i][k])
params = append(params, listMap[i][k])
}
bvalues = append(bvalues, valueHolderStr)
if len(bvalues) == batch {
values = append(values, valueHolderStr)
if len(values) == batchNum {
r, err := bs.db.doExec(link, fmt.Sprintf("%s INTO %s(%s) VALUES%s %s",
operation, table, keyStr, strings.Join(bvalues, ","),
operation, table, keyStr, strings.Join(values, ","),
updateStr),
params...)
if err != nil {
@ -366,14 +416,14 @@ func (bs *dbBase) doBatchInsert(link dbLink, table string, list List, batch int,
batchResult.lastResult = r
batchResult.rowsAffected += n
}
params = params[:0]
bvalues = bvalues[:0]
params = params[:0]
values = values[:0]
}
}
// 处理最后不构成指定批量的数据
if len(bvalues) > 0 {
if len(values) > 0 {
r, err := bs.db.doExec(link, fmt.Sprintf("%s INTO %s(%s) VALUES%s %s",
operation, table, keyStr, strings.Join(bvalues, ","),
operation, table, keyStr, strings.Join(values, ","),
updateStr),
params...)
if err != nil {
@ -389,8 +439,8 @@ func (bs *dbBase) doBatchInsert(link dbLink, table string, list List, batch int,
return batchResult, nil
}
// CURD操作:数据更新统一采用sql预处理
// data参数支持字符串或者关联数组类型,内部会自行做判断处理
// CURD操作:数据更新统一采用sql预处理
// data参数支持string/map/struct/*struct类型。
func (bs *dbBase) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) {
link, err := bs.db.Master()
if err != nil {
@ -399,8 +449,8 @@ func (bs *dbBase) Update(table string, data interface{}, condition interface{},
return bs.db.doUpdate(link, table, data, condition, args ...)
}
// CURD操作:数据更新统一采用sql预处理
// data参数支持字符串或者关联数组类型,内部会自行做判断处理
// CURD操作:数据更新统一采用sql预处理
// data参数支持string/map/struct/*struct类型类型。
func (bs *dbBase) doUpdate(link dbLink, table string, data interface{}, condition interface{}, args ...interface{}) (result sql.Result, err error) {
params := ([]interface{})(nil)
updates := ""

View File

@ -333,7 +333,7 @@ func (md *Model) Save() (result sql.Result, err error) {
}
// 批量操作
if list, ok := md.data.(List); ok {
batch := 10
batch := gDEFAULT_BATCH_NUM
if md.batch > 0 {
batch = md.batch
}

View File

@ -100,33 +100,33 @@ func (tx *TX) GetCount(query string, args ...interface{}) (int, error) {
}
// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回
func (tx *TX) Insert(table string, data interface{}) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_INSERT)
func (tx *TX) Insert(table string, data interface{}, batch...int) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_INSERT, batch...)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (tx *TX) Replace(table string, data interface{}) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_REPLACE)
func (tx *TX) Replace(table string, data interface{}, batch...int) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_REPLACE, batch...)
}
// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (tx *TX) Save(table string, data interface{}) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_SAVE)
func (tx *TX) Save(table string, data interface{}, batch...int) (sql.Result, error) {
return tx.db.doInsert(tx.tx, table, data, OPTION_SAVE, batch...)
}
// CURD操作:批量数据指定批次量写入
func (tx *TX) BatchInsert(table string, list List, batch int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, batch, OPTION_INSERT)
func (tx *TX) BatchInsert(table string, list interface{}, batch...int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, OPTION_INSERT, batch...)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条
func (tx *TX) BatchReplace(table string, list List, batch int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, batch, OPTION_REPLACE)
func (tx *TX) BatchReplace(table string, list interface{}, batch...int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, OPTION_REPLACE, batch...)
}
// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据
func (tx *TX) BatchSave(table string, list List, batch int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, batch, OPTION_SAVE)
func (tx *TX) BatchSave(table string, list interface{}, batch...int) (sql.Result, error) {
return tx.db.doBatchInsert(tx.tx, table, list, OPTION_SAVE, batch...)
}
// CURD操作:数据更新统一采用sql预处理

View File

@ -79,7 +79,6 @@ func TestDbBase_Insert(t *testing.T) {
}
result, err = db.Insert("user", User{
Id : 3,
Uid : 3,
Passport : "t3",
Password : "25d55ad283aa400af464c76d713c07ad",
Nickname : "T3",
@ -96,7 +95,6 @@ func TestDbBase_Insert(t *testing.T) {
result, err = db.Insert("user", &User{
Id : 4,
Uid : 4,
Passport : "t4",
Password : "25d55ad283aa400af464c76d713c07ad",
Nickname : "T4",