diff --git a/g/database/gdb/gdb.go b/g/database/gdb/gdb.go index 428579e86..d5e6232d2 100644 --- a/g/database/gdb/gdb.go +++ b/g/database/gdb/gdb.go @@ -27,22 +27,54 @@ const ( // 数据库操作接口 type Link interface { + // 打开数据库连接,建立数据库操作对象 Open (c *ConfigNode) (*sql.DB, error) - Close() error + + // SQL操作方法 Query(q string, args ...interface{}) (*sql.Rows, error) Exec(q string, args ...interface{}) (sql.Result, error) Prepare(q string) (*sql.Stmt, error) + // 数据库查询 GetAll(q string, args ...interface{}) (List, error) GetOne(q string, args ...interface{}) (Map, error) GetValue(q string, args ...interface{}) (interface{}, error) + // Ping PingMaster() error PingSlave() error + // 连接属性设置 SetMaxIdleConns(n int) SetMaxOpenConns(n int) + // 开启事务操作 + Begin() (*sql.Tx, error) + + // 数据表插入/更新/保存操作 + Insert(table string, data Map) (sql.Result, error) + Replace(table string, data Map) (sql.Result, error) + Save(table string, data Map) (sql.Result, error) + + // 数据表插入/更新/保存操作(批量) + BatchInsert(table string, list List, batch int) (sql.Result, error) + BatchReplace(table string, list List, batch int) (sql.Result, error) + BatchSave(table string, list List, batch int) (sql.Result, error) + + // 数据修改/删除 + Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) + Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) + + // 创建链式操作对象(Table为From的别名) + Table(tables string) (*DbOp) + From(tables string) (*DbOp) + + // 关闭数据库操作对象 + Close() error + + // 内部方法 + insert(table string, data Map, option uint8) (sql.Result, error) + batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) setMaster(master *sql.DB) setSlave(slave *sql.DB) setQuoteChar(left string, right string) @@ -50,36 +82,15 @@ type Link interface { getQuoteCharLeft () string getQuoteCharRight () string handleSqlBeforeExec(q *string) *string - - Begin() (*sql.Tx, error) - Commit() error - Rollback() error - - insert(table string, data Map, option uint8) (sql.Result, error) - Insert(table string, data Map) (sql.Result, error) - Replace(table string, data Map) (sql.Result, error) - Save(table string, data Map) (sql.Result, error) - - batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) - BatchInsert(table string, list List, batch int) (sql.Result, error) - BatchReplace(table string, list List, batch int) (sql.Result, error) - BatchSave(table string, list List, batch int) (sql.Result, error) - - Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) - Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) - - Table(tables string) (*gLinkOp) - From(tables string) (*gLinkOp) } // 数据库链接对象 -type dbLink struct { - link Link - transaction *sql.Tx - master *sql.DB - slave *sql.DB - charl string - charr string +type Db struct { + link Link + master *sql.DB + slave *sql.DB + charl string + charr string } // 关联数组,绑定一条数据表记录 @@ -186,10 +197,10 @@ func newLink (masterNode *ConfigNode, slaveNode *ConfigNode) (Link, error) { var link Link switch masterNode.Type { case "mysql": - link = Link(&mysqlLink{}) + link = Link(&dbmysql{}) case "pgsql": - link = Link(&pgsqlLink{}) + link = Link(&dbpgsql{}) default: return nil, errors.New(fmt.Sprintf("unsupported db type '%s'", masterNode.Type)) @@ -213,23 +224,23 @@ func newLink (masterNode *ConfigNode, slaveNode *ConfigNode) (Link, error) { } // 设置master链接对象 -func (l *dbLink) setMaster(master *sql.DB) { - l.master = master +func (db *Db) setMaster(master *sql.DB) { + db.master = master } // 设置slave链接对象 -func (l *dbLink) setSlave(slave *sql.DB) { - l.slave = slave +func (db *Db) setSlave(slave *sql.DB) { + db.slave = slave } // 设置当前数据库类型引用字符 -func (l *dbLink) setQuoteChar(left string, right string) { - l.charl = left - l.charr = right +func (db *Db) setQuoteChar(left string, right string) { + db.charl = left + db.charr = right } // 设置挡脸操作的link接口 -func (l *dbLink) setLink(link Link) { - l.link = link +func (db *Db) setLink(link Link) { + db.link = link } diff --git a/g/database/gdb/gdb_base.go b/g/database/gdb/gdb_base.go index 85eba7c70..9be31a0a5 100644 --- a/g/database/gdb/gdb_base.go +++ b/g/database/gdb/gdb_base.go @@ -16,20 +16,20 @@ import ( ) // 关闭链接 -func (l *dbLink) Close() error { - if l.master != nil { - err := l.master.Close() - if (err == nil) { - l.master = nil +func (db *Db) Close() error { + if db.master != nil { + err := db.master.Close() + if err == nil { + db.master = nil } else { glog.Fatal(err) return err } } - if l.slave != nil { - err := l.slave.Close() - if (err == nil) { - l.slave = nil + if db.slave != nil { + err := db.slave.Close() + if err == nil { + db.slave = nil } else { glog.Fatal(err) return err @@ -39,44 +39,44 @@ func (l *dbLink) Close() error { } // 数据库sql查询操作,主要执行查询 -func (l *dbLink) Query(q string, args ...interface{}) (*sql.Rows, error) { - p := l.link.handleSqlBeforeExec(&q) - rows, err := l.slave.Query(*p, args ...) - err = l.formatError(err, p, args...) - if (err == nil) { +func (db *Db) Query(q string, args ...interface{}) (*sql.Rows, error) { + p := db.link.handleSqlBeforeExec(&q) + rows, err := db.slave.Query(*p, args ...) + err = db.formatError(err, p, args...) + if err == nil { return rows, nil } return nil, err } // 执行一条sql,并返回执行情况,主要用于非查询操作 -func (l *dbLink) Exec(q string, args ...interface{}) (sql.Result, error) { +func (db *Db) Exec(q string, args ...interface{}) (sql.Result, error) { //fmt.Println(q) //fmt.Println(args) - p := l.link.handleSqlBeforeExec(&q) - r, err := l.master.Exec(*p, args ...) - err = l.formatError(err, p, args...) + p := db.link.handleSqlBeforeExec(&q) + r, err := db.master.Exec(*p, args ...) + err = db.formatError(err, p, args...) return r, err } // 格式化错误信息 -func (l *dbLink) formatError(err error, q *string, args ...interface{}) error { +func (db *Db) formatError(err error, q *string, args ...interface{}) error { if err != nil { errstr := fmt.Sprintf("DB ERROR: %s\n", err.Error()) errstr += fmt.Sprintf("DB QUERY: %s\n", *q) if len(args) > 0 { errstr += fmt.Sprintf("DB PARAM: %v\n", args) } - err = errors.New(errstr) + err = errors.New(errstr) } return err } // 数据库查询,获取查询结果集,以列表结构返回 -func (l *dbLink) GetAll(q string, args ...interface{}) (List, error) { +func (db *Db) GetAll(q string, args ...interface{}) (List, error) { // 执行sql - rows, err := l.Query(q, args ...) + rows, err := db.Query(q, args ...) if err != nil || rows == nil { return nil, err } @@ -107,8 +107,8 @@ func (l *dbLink) GetAll(q string, args ...interface{}) (List, error) { } // 数据库查询,获取查询结果集,以关联数组结构返回 -func (l *dbLink) GetOne(q string, args ...interface{}) (Map, error) { - list, err := l.GetAll(q, args ...) +func (db *Db) GetOne(q string, args ...interface{}) (Map, error) { + list, err := db.GetAll(q, args ...) if err != nil { return nil, err } @@ -116,8 +116,8 @@ func (l *dbLink) GetOne(q string, args ...interface{}) (Map, error) { } // 数据库查询,获取查询字段值 -func (l *dbLink) GetValue(q string, args ...interface{}) (interface{}, error) { - one, err := l.GetOne(q, args ...) +func (db *Db) GetValue(q string, args ...interface{}) (interface{}, error) { + one, err := db.GetOne(q, args ...) if err != nil { return "", err } @@ -129,61 +129,39 @@ func (l *dbLink) GetValue(q string, args ...interface{}) (interface{}, error) { // sql预处理,执行完成后调用返回值sql.Stmt.Exec完成sql操作 // 记得调用sql.Stmt.Close关闭操作对象 -func (l *dbLink) Prepare(q string) (*sql.Stmt, error) { - return l.master.Prepare(q) +func (db *Db) Prepare(q string) (*sql.Stmt, error) { + return db.master.Prepare(q) } // ping一下,判断或保持数据库链接(master) -func (l *dbLink) PingMaster() error { - err := l.master.Ping(); +func (db *Db) PingMaster() error { + err := db.master.Ping(); return err } // ping一下,判断或保持数据库链接(slave) -func (l *dbLink) PingSlave() error { - err := l.slave.Ping(); +func (db *Db) PingSlave() error { + err := db.slave.Ping(); return err } // 设置数据库连接池中空闲链接的大小 -func (l *dbLink) SetMaxIdleConns(n int) { - l.master.SetMaxIdleConns(n); +func (db *Db) SetMaxIdleConns(n int) { + db.master.SetMaxIdleConns(n); } // 设置数据库连接池最大打开的链接数量 -func (l *dbLink) SetMaxOpenConns(n int) { - l.master.SetMaxOpenConns(n); +func (db *Db) SetMaxOpenConns(n int) { + db.master.SetMaxOpenConns(n); } // 事务操作,开启,会返回一个底层的事务操作对象链接如需要嵌套事务,那么可以使用该对象,否则请忽略 -func (l *dbLink) Begin() (*sql.Tx, error) { - tx, err := l.master.Begin() - if err == nil { - l.transaction = tx - } - return tx, err -} - -// 事务操作,提交 -func (l *dbLink) Commit() error { - if l.transaction == nil { - return errors.New("transaction not start") - } - err := l.transaction.Commit() - return err -} - -// 事务操作,回滚 -func (l *dbLink) Rollback() error { - if l.transaction == nil { - return errors.New("transaction not start") - } - err := l.transaction.Rollback() - return err +func (db *Db) Begin() (*sql.Tx, error) { + return db.master.Begin() } // 根据insert选项获得操作名称 -func (l *dbLink) getInsertOperationByOption(option uint8) string { +func (db *Db) getInsertOperationByOption(option uint8) string { oper := "INSERT" switch option { case OPTION_INSERT: @@ -201,47 +179,47 @@ func (l *dbLink) getInsertOperationByOption(option uint8) string { // 1: replace: 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 // 2: save: 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 // 3: ignore: 如果数据存在(主键或者唯一索引),那么什么也不做 -func (l *dbLink) insert(table string, data Map, option uint8) (sql.Result, error) { +func (db *Db) insert(table string, data Map, option uint8) (sql.Result, error) { var keys []string var values []string var params []interface{} for k, v := range data { - keys = append(keys, l.charl + k + l.charr) + keys = append(keys, db.charl + k + db.charr) values = append(values, "?") params = append(params, v) } - operation := l.getInsertOperationByOption(option) + operation := db.getInsertOperationByOption(option) updatestr := "" if option == OPTION_SAVE { var updates []string for k, _ := range data { - updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", l.charl, k, l.charr, k)) + updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", db.charl, k, db.charr, k)) } updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ",")) } - return l.Exec( + return db.Exec( fmt.Sprintf("%s INTO %s%s%s(%s) VALUES(%s) %s", - operation, l.charl, table, l.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params... + operation, db.charl, table, db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params... ) } // CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回 -func (l *dbLink) Insert(table string, data Map) (sql.Result, error) { - return l.link.insert(table, data, OPTION_INSERT) +func (db *Db) Insert(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_INSERT) } // CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 -func (l *dbLink) Replace(table string, data Map) (sql.Result, error) { - return l.link.insert(table, data, OPTION_REPLACE) +func (db *Db) Replace(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_REPLACE) } // CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 -func (l *dbLink) Save(table string, data Map) (sql.Result, error) { - return l.link.insert(table, data, OPTION_SAVE) +func (db *Db) Save(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_SAVE) } // 批量写入数据 -func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) { +func (db *Db) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) { var keys []string var values []string var bvalues []string @@ -257,14 +235,14 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) ( keys = append(keys, k) values = append(values, "?") } - var kstr = l.charl + strings.Join(keys, l.charl + "," + l.charr) + l.charr + var kstr = db.charl + strings.Join(keys, db.charl + "," + db.charr) + db.charr // 操作判断 - operation := l.getInsertOperationByOption(option) + operation := db.getInsertOperationByOption(option) updatestr := "" if option == OPTION_SAVE { var updates []string for _, k := range keys { - updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", l.charl, k, l.charr, k)) + updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", db.charl, k, db.charr, k)) } updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ",")) } @@ -275,7 +253,7 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) ( } bvalues = append(bvalues, "(" + strings.Join(values, ",") + ")") if len(bvalues) == batch { - r, err := l.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, l.charl, table, l.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) + r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) if err != nil { return result, err } @@ -285,7 +263,7 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) ( } // 处理最后不构成指定批量的数据 if len(bvalues) > 0 { - r, err := l.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, l.charl, table, l.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) + r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) if err != nil { return result, err } @@ -295,23 +273,23 @@ func (l *dbLink) batchInsert(table string, list List, batch int, option uint8) ( } // CURD操作:批量数据指定批次量写入 -func (l *dbLink) BatchInsert(table string, list List, batch int) (sql.Result, error) { - return l.link.batchInsert(table, list, batch, OPTION_INSERT) +func (db *Db) BatchInsert(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_INSERT) } // CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 -func (l *dbLink) BatchReplace(table string, list List, batch int) (sql.Result, error) { - return l.link.batchInsert(table, list, batch, OPTION_REPLACE) +func (db *Db) BatchReplace(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_REPLACE) } // CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 -func (l *dbLink) BatchSave(table string, list List, batch int) (sql.Result, error) { - return l.link.batchInsert(table, list, batch, OPTION_SAVE) +func (db *Db) BatchSave(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_SAVE) } // CURD操作:数据更新,统一采用sql预处理 // data参数支持字符串或者关联数组类型,内部会自行做判断处理 -func (l *dbLink) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) { +func (db *Db) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) { var params []interface{} var updates string switch data.(type) { @@ -320,7 +298,7 @@ func (l *dbLink) Update(table string, data interface{}, condition interface{}, a case Map: var keys []string for k, v := range data.(Map) { - keys = append(keys, fmt.Sprintf("%s%s%s=?", l.charl, k, l.charr)) + keys = append(keys, fmt.Sprintf("%s%s%s=?", db.charl, k, db.charr)) params = append(params, v) } updates = strings.Join(keys, ",") @@ -337,11 +315,11 @@ func (l *dbLink) Update(table string, data interface{}, condition interface{}, a } } - return l.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", l.charl, table, l.charr, updates, condition), params...) + return db.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", db.charl, table, db.charr, updates, condition), params...) } // CURD操作:删除数据 -func (l *dbLink) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) { - return l.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", l.charl, table, l.charr, condition), args...) +func (db *Db) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) { + return db.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", db.charl, table, db.charr, condition), args...) } diff --git a/g/database/gdb/gdb_linkop.go b/g/database/gdb/gdb_linkop.go index b4c3b7709..af704de7b 100644 --- a/g/database/gdb/gdb_linkop.go +++ b/g/database/gdb/gdb_linkop.go @@ -14,7 +14,7 @@ import ( ) // 数据库链式操作对象 -type gLinkOp struct { +type DbOp struct { link Link // 数据库链接对象 tables string // 数据库操作表 fields string // 操作字段 @@ -29,76 +29,76 @@ type gLinkOp struct { } // 链式操作,数据表字段,可支持多个表,以半角逗号连接 -func (l *dbLink) Table(tables string) (*gLinkOp) { - return &gLinkOp { - link : l.link, +func (db *Db) Table(tables string) (*DbOp) { + return &DbOp { + link : db.link, tables: tables, } } // 链式操作,数据表字段,可支持多个表,以半角逗号连接 -func (l *dbLink) From(tables string) (*gLinkOp) { - return l.Table(tables) +func (db *Db) From(tables string) (*DbOp) { + return db.Table(tables) } // 链式操作,左联表 -func (op *gLinkOp) LeftJoin(joinTable string, on string) (*gLinkOp) { +func (op *DbOp) LeftJoin(joinTable string, on string) (*DbOp) { op.tables += fmt.Sprintf(" LEFT JOIN %s ON (%s)", joinTable, on) return op } // 链式操作,右联表 -func (op *gLinkOp) RightJoin(joinTable string, on string) (*gLinkOp) { +func (op *DbOp) RightJoin(joinTable string, on string) (*DbOp) { op.tables += fmt.Sprintf(" RIGHT JOIN %s ON (%s)", joinTable, on) return op } // 链式操作,内联表 -func (op *gLinkOp) InnerJoin(joinTable string, on string) (*gLinkOp) { +func (op *DbOp) InnerJoin(joinTable string, on string) (*DbOp) { op.tables += fmt.Sprintf(" INNER JOIN %s ON (%s)", joinTable, on) return op } // 链式操作,查询字段 -func (op *gLinkOp) Fields(fields string) (*gLinkOp) { +func (op *DbOp) Fields(fields string) (*DbOp) { op.fields = fields return op } // 链式操作,consition -func (op *gLinkOp) Where(where string, args...interface{}) (*gLinkOp) { +func (op *DbOp) Where(where string, args...interface{}) (*DbOp) { op.where = where op.whereArgs = args return op } // 链式操作,group by -func (op *gLinkOp) GroupBy(groupby string) (*gLinkOp) { +func (op *DbOp) GroupBy(groupby string) (*DbOp) { op.groupby = groupby return op } // 链式操作,order by -func (op *gLinkOp) OrderBy(orderby string) (*gLinkOp) { +func (op *DbOp) OrderBy(orderby string) (*DbOp) { op.orderby = orderby return op } // 链式操作,limit -func (op *gLinkOp) Limit(start int, limit int) (*gLinkOp) { +func (op *DbOp) Limit(start int, limit int) (*DbOp) { op.start = start op.limit = limit return op } // 链式操作,操作数据记录项 -func (op *gLinkOp) Data(data interface{}) (*gLinkOp) { +func (op *DbOp) Data(data interface{}) (*DbOp) { op.data = data return op } // 链式操作, CURD - Insert/BatchInsert -func (op *gLinkOp) Insert() (sql.Result, error) { +func (op *DbOp) Insert() (sql.Result, error) { // 批量操作 if list, ok := op.data.(List); ok { batch := 10 @@ -118,7 +118,7 @@ func (op *gLinkOp) Insert() (sql.Result, error) { } // 链式操作, CURD - Replace/BatchReplace -func (op *gLinkOp) Replace() (sql.Result, error) { +func (op *DbOp) Replace() (sql.Result, error) { // 批量操作 if list, ok := op.data.(List); ok { batch := 10 @@ -138,7 +138,7 @@ func (op *gLinkOp) Replace() (sql.Result, error) { } // 链式操作, CURD - Save/BatchSave -func (op *gLinkOp) Save() (sql.Result, error) { +func (op *DbOp) Save() (sql.Result, error) { // 批量操作 if list, ok := op.data.(List); ok { batch := 10 @@ -158,7 +158,7 @@ func (op *gLinkOp) Save() (sql.Result, error) { } // 链式操作, CURD - Update -func (op *gLinkOp) Update() (sql.Result, error) { +func (op *DbOp) Update() (sql.Result, error) { if op.data == nil { return nil, errors.New("updating table with empty data") } @@ -166,7 +166,7 @@ func (op *gLinkOp) Update() (sql.Result, error) { } // 链式操作, CURD - Delete -func (op *gLinkOp) Delete() (sql.Result, error) { +func (op *DbOp) Delete() (sql.Result, error) { if op.where == "" { return nil, errors.New("where is required while deleting") } @@ -174,13 +174,13 @@ func (op *gLinkOp) Delete() (sql.Result, error) { } // 设置批处理的大小 -func (op *gLinkOp) Batch(batch int) *gLinkOp { +func (op *DbOp) Batch(batch int) *DbOp { op.batch = batch return op } // 链式操作,select -func (op *gLinkOp) Select() (List, error) { +func (op *DbOp) Select() (List, error) { if op.fields == "" { op.fields = "*" } @@ -201,12 +201,12 @@ func (op *gLinkOp) Select() (List, error) { } // 链式操作,查询所有记录 -func (op *gLinkOp) All() (List, error) { +func (op *DbOp) All() (List, error) { return op.Select() } // 链式操作,查询单条记录 -func (op *gLinkOp) One() (Map, error) { +func (op *DbOp) One() (Map, error) { list, err := op.All() if err != nil { return nil, err @@ -215,7 +215,7 @@ func (op *gLinkOp) One() (Map, error) { } // 链式操作,查询字段值 -func (op *gLinkOp) Value() (interface{}, error) { +func (op *DbOp) Value() (interface{}, error) { one, err := op.One() if err != nil { return "", err diff --git a/g/database/gdb/gdb_mysql.go b/g/database/gdb/gdb_mysql.go index 70bc6ccdc..e9895d117 100644 --- a/g/database/gdb/gdb_mysql.go +++ b/g/database/gdb/gdb_mysql.go @@ -8,42 +8,41 @@ package gdb import ( - "database/sql" "fmt" - "gitee.com/johng/gf/g/os/glog" + "database/sql" ) // 数据库链接对象 -type mysqlLink struct { - dbLink +type dbmysql struct { + Db } // 创建SQL操作对象,内部采用了lazy link处理 -func (l *mysqlLink) Open (c *ConfigNode) (*sql.DB, error) { +func (db *dbmysql) Open (c *ConfigNode) (*sql.DB, error) { var dbsource string if c.Linkinfo != "" { dbsource = c.Linkinfo } else { dbsource = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", c.User, c.Pass, c.Host, c.Port, c.Name) } - db, err := sql.Open("mysql", dbsource) - if err != nil { - glog.Fatal(err) + if db, err := sql.Open("mysql", dbsource); err == nil { + return db, nil + } else { + return nil, err } - return db, err } // 获得关键字操作符 - 左 -func (l *mysqlLink) getQuoteCharLeft () string { +func (db *dbmysql) getQuoteCharLeft () string { return "`" } // 获得关键字操作符 - 右 -func (l *mysqlLink) getQuoteCharRight () string { +func (db *dbmysql) getQuoteCharRight () string { return "`" } // 在执行sql之前对sql进行进一步处理 -func (l *mysqlLink) handleSqlBeforeExec(q *string) *string { +func (db *dbmysql) handleSqlBeforeExec(q *string) *string { return q } \ No newline at end of file diff --git a/g/database/gdb/gdb_pgsql.go b/g/database/gdb/gdb_pgsql.go index 49a91cf83..39b56e623 100644 --- a/g/database/gdb/gdb_pgsql.go +++ b/g/database/gdb/gdb_pgsql.go @@ -8,47 +8,46 @@ package gdb import ( - "database/sql" "fmt" "regexp" - "gitee.com/johng/gf/g/os/glog" + "database/sql" ) // postgresql的适配 // @todo 需要完善replace和save的操作覆盖 // 数据库链接对象 -type pgsqlLink struct { - dbLink +type dbpgsql struct { + Db } // 创建SQL操作对象,内部采用了lazy link处理 -func (l *pgsqlLink) Open (c *ConfigNode) (*sql.DB, error) { +func (db *dbpgsql) Open (c *ConfigNode) (*sql.DB, error) { var dbsource string if c.Linkinfo != "" { dbsource = c.Linkinfo } else { dbsource = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s", c.User, c.Pass, c.Host, c.Port, c.Name) } - db, err := sql.Open("postgres", dbsource) - if err != nil { - glog.Fatal(err) + if db, err := sql.Open("postgres", dbsource); err == nil { + return db, nil + } else { + return nil, err } - return db, err } // 获得关键字操作符 - 左 -func (l *pgsqlLink) getQuoteCharLeft () string { +func (db *dbpgsql) getQuoteCharLeft () string { return "\"" } // 获得关键字操作符 - 右 -func (l *pgsqlLink) getQuoteCharRight () string { +func (db *dbpgsql) getQuoteCharRight () string { return "\"" } // 在执行sql之前对sql进行进一步处理 -func (l *pgsqlLink) handleSqlBeforeExec(q *string) *string { +func (db *dbpgsql) handleSqlBeforeExec(q *string) *string { reg := regexp.MustCompile("\\?") index := 0 str := reg.ReplaceAllStringFunc(*q, func (s string) string { diff --git a/g/database/gdb/gdb_transaction.go b/g/database/gdb/gdb_transaction.go new file mode 100644 index 000000000..1bc20e823 --- /dev/null +++ b/g/database/gdb/gdb_transaction.go @@ -0,0 +1,276 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gdb + +import ( + "fmt" + "errors" + "database/sql" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gcache" + "gitee.com/johng/gf/g/util/grand" + _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "strings" +) + +// 数据库事务对象 +type Tx struct { + db Db + tx *sql.Tx + + //Query(q string, args ...interface{}) (*sql.Rows, error) + //Exec(q string, args ...interface{}) (sql.Result, error) + //Prepare(q string) (*sql.Stmt, error) + // + //GetAll(q string, args ...interface{}) (List, error) + //GetOne(q string, args ...interface{}) (Map, error) + //GetValue(q string, args ...interface{}) (interface{}, error) + // + // + //Insert(table string, data Map) (sql.Result, error) + //Replace(table string, data Map) (sql.Result, error) + //Save(table string, data Map) (sql.Result, error) + // + //BatchInsert(table string, list List, batch int) (sql.Result, error) + //BatchReplace(table string, list List, batch int) (sql.Result, error) + //BatchSave(table string, list List, batch int) (sql.Result, error) + // + //Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) + //Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) + // + //Table(tables string) (*gLinkOp) + //From(tables string) (*gLinkOp) +} + +// 数据库sql查询操作,主要执行查询 +func (tx *Tx) Query(q string, args ...interface{}) (*sql.Rows, error) { + p := tx.db.link.handleSqlBeforeExec(&q) + rows, err := tx.tx.Query(*p, args ...) + err = tx.db.formatError(err, p, args...) + if err == nil { + return rows, nil + } + return nil, err +} + +// 执行一条sql,并返回执行情况,主要用于非查询操作 +func (tx *Tx) Exec(q string, args ...interface{}) (sql.Result, error) { + p := tx.db.link.handleSqlBeforeExec(&q) + r, err := tx.tx.Exec(*p, args ...) + err = tx.db.formatError(err, p, args...) + return r, err +} + +// 数据库查询,获取查询结果集,以列表结构返回 +func (tx *Tx) GetAll(q string, args ...interface{}) (List, error) { + // 执行sql + rows, err := tx.Query(q, args ...) + if err != nil || rows == nil { + return nil, err + } + // 列名称列表 + columns, err := rows.Columns() + if err != nil { + return nil, err + } + // 返回结构组装 + values := make([]sql.RawBytes, len(columns)) + scanArgs := make([]interface{}, len(values)) + var list List + for i := range values { + scanArgs[i] = &values[i] + } + for rows.Next() { + err = rows.Scan(scanArgs...) + if err != nil { + return list, err + } + row := make(Map) + for i, col := range values { + row[columns[i]] = string(col) + } + list = append(list, row) + } + return list, nil +} + +// 数据库查询,获取查询结果集,以关联数组结构返回 +func (tx *Tx) GetOne(q string, args ...interface{}) (Map, error) { + list, err := tx.GetAll(q, args ...) + if err != nil { + return nil, err + } + return list[0], nil +} + +// 数据库查询,获取查询字段值 +func (tx *Tx) GetValue(q string, args ...interface{}) (interface{}, error) { + one, err := tx.GetOne(q, args ...) + if err != nil { + return "", err + } + for _, v := range one { + return v, nil + } + return "", nil +} + +// sql预处理,执行完成后调用返回值sql.Stmt.Exec完成sql操作 +// 记得调用sql.Stmt.Close关闭操作对象 +func (tx *Tx) Prepare(q string) (*sql.Stmt, error) { + return tx.Prepare(q) +} + +// insert、replace, save, ignore操作 +// 0: insert: 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回 +// 1: replace: 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 +// 2: save: 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 +// 3: ignore: 如果数据存在(主键或者唯一索引),那么什么也不做 +func (tx *Tx) insert(table string, data Map, option uint8) (sql.Result, error) { + var keys []string + var values []string + var params []interface{} + for k, v := range data { + keys = append(keys, tx.db.charl + k + tx.db.charr) + values = append(values, "?") + params = append(params, v) + } + operation := tx.db.getInsertOperationByOption(option) + updatestr := "" + if option == OPTION_SAVE { + var updates []string + for k, _ := range data { + updates = append(updates, fmt.Sprintf("%s%s%s=VALUES(%s)", db.charl, k, db.charr, k)) + } + updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ",")) + } + return tx.Exec( + fmt.Sprintf("%s INTO %s%s%s(%s) VALUES(%s) %s", + operation, tx.db.charl, table, db.charr, strings.Join(keys, ","), strings.Join(values, ","), updatestr), params... + ) +} + +// CURD操作:单条数据写入, 仅仅执行写入操作,如果存在冲突的主键或者唯一索引,那么报错返回 +func (tx *Tx) Insert(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_INSERT) +} + +// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 +func (tx *Tx) Replace(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_REPLACE) +} + +// CURD操作:单条数据写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 +func (tx *Tx) Save(table string, data Map) (sql.Result, error) { + return db.link.insert(table, data, OPTION_SAVE) +} + +// 批量写入数据 +func (tx *Tx) batchInsert(table string, list List, batch int, option uint8) (sql.Result, error) { + var keys []string + var values []string + var bvalues []string + var params []interface{} + var result sql.Result + var size = len(list) + // 判断长度 + if size < 1 { + return result, errors.New("empty data list") + } + // 首先获取字段名称及记录长度 + for k, _ := range list[0] { + keys = append(keys, k) + values = append(values, "?") + } + var kstr = db.charl + strings.Join(keys, db.charl + "," + db.charr) + db.charr + // 操作判断 + operation := db.getInsertOperationByOption(option) + updatestr := "" + if option == OPTION_SAVE { + var updates []string + for _, k := range keys { + updates = append(updates, fmt.Sprintf("%s=VALUES(%s)", db.charl, k, db.charr, k)) + } + updatestr = fmt.Sprintf(" ON DUPLICATE KEY UPDATE %s", strings.Join(updates, ",")) + } + // 构造批量写入数据格式(注意map的遍历是无序的) + for i := 0; i < size; i++ { + for _, k := range keys { + params = append(params, list[i][k]) + } + bvalues = append(bvalues, "(" + strings.Join(values, ",") + ")") + if len(bvalues) == batch { + r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) + if err != nil { + return result, err + } + result = r + bvalues = bvalues[:0] + } + } + // 处理最后不构成指定批量的数据 + if len(bvalues) > 0 { + r, err := db.Exec(fmt.Sprintf("%s INTO %s%s%s(%s) VALUES%s %s", operation, db.charl, table, db.charr, kstr, strings.Join(bvalues, ","), updatestr), params...) + if err != nil { + return result, err + } + result = r + } + return result, nil +} + +// CURD操作:批量数据指定批次量写入 +func (tx *Tx) BatchInsert(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_INSERT) +} + +// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么删除后重新写入一条 +func (tx *Tx) BatchReplace(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_REPLACE) +} + +// CURD操作:批量数据指定批次量写入, 如果数据存在(主键或者唯一索引),那么更新,否则写入一条新数据 +func (tx *Tx) BatchSave(table string, list List, batch int) (sql.Result, error) { + return db.link.batchInsert(table, list, batch, OPTION_SAVE) +} + +// CURD操作:数据更新,统一采用sql预处理 +// data参数支持字符串或者关联数组类型,内部会自行做判断处理 +func (tx *Tx) Update(table string, data interface{}, condition interface{}, args ...interface{}) (sql.Result, error) { + var params []interface{} + var updates string + switch data.(type) { + case string: + updates = data.(string) + case Map: + var keys []string + for k, v := range data.(Map) { + keys = append(keys, fmt.Sprintf("%s%s%s=?", db.charl, k, db.charr)) + params = append(params, v) + } + updates = strings.Join(keys, ",") + + default: + return nil, errors.New("invalid data type for 'data' field, string or Map expected") + } + for _, v := range args { + if r, ok := v.(string); ok { + params = append(params, r) + } else if r, ok := v.(int); ok { + params = append(params, string(r)) + } else { + + } + } + return db.Exec(fmt.Sprintf("UPDATE %s%s%s SET %s WHERE %s", db.charl, table, db.charr, updates, condition), params...) +} + +// CURD操作:删除数据 +func (tx *Tx) Delete(table string, condition interface{}, args ...interface{}) (sql.Result, error) { + return db.Exec(fmt.Sprintf("DELETE FROM %s WHERE %s", db.charl, table, db.charr, condition), args...) +} diff --git a/geg/database/mysql/mysql.go b/geg/database/mysql/mysql.go index f7def237f..658ccdba3 100644 --- a/geg/database/mysql/mysql.go +++ b/geg/database/mysql/mysql.go @@ -376,23 +376,25 @@ func linkopBatchSave() { // 事务操作示例1 func transaction1() { fmt.Println("transaction1:") - db.Begin() - r, err := db.Save("user", gdb.Map{ - "uid" : 1, - "name" : "john", - }) - db.Rollback() - fmt.Println(r, err) + if tx, err := db.Begin(); err == nil { + r, err := db.Save("user", gdb.Map{ + "uid" : 1, + "name" : "john", + }) + tx.Rollback() + fmt.Println(r, err) + } fmt.Println() } // 事务操作示例2 func transaction2() { fmt.Println("transaction2:") - db.Begin() - r, err := db.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save() - db.Commit() - fmt.Println(r, err) + if tx, err := db.Begin(); err == nil { + r, err := db.Table("user").Data(gdb.Map{"uid":1, "name": "john_1"}).Save() + tx.Commit() + fmt.Println(r, err) + } fmt.Println() } @@ -454,5 +456,5 @@ func main() { //linkopUpdate3() //keepPing() transaction1() - transaction2() + //transaction2() } \ No newline at end of file