diff --git a/contrib/drivers/clickhouse/clickhouse.go b/contrib/drivers/clickhouse/clickhouse.go index a5fc94de8..f6cb9cff8 100644 --- a/contrib/drivers/clickhouse/clickhouse.go +++ b/contrib/drivers/clickhouse/clickhouse.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "github.com/ClickHouse/clickhouse-go" + "strings" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/database/gdb" @@ -34,6 +35,8 @@ var ( ErrUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore") ErrUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId") ErrUnsupportedReplace = errors.New("unsupported method:Replace") + ErrUnsupportedBegin = errors.New("unsupported method:Begin") + ErrUnsupportedTransaction = errors.New("unsupported method:Transaction") ) func init() { @@ -201,12 +204,6 @@ func (d *Driver) ping(conn *sql.DB) error { return err } -// Transaction Clickhouse does not support transactions -// So when you call this method you get an error. -func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error { - return errors.New("transaction operations are not supported") -} - // DoUpdateSQL in clickhouse ,use update must use alter // eg. // ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr @@ -228,8 +225,50 @@ func (d *Driver) DoCommit(ctx context.Context, in gdb.DoCommitInput) (out gdb.Do } func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption) (result sql.Result, err error) { - option.IsIgnoreResult = true - return d.Core.DoInsert(ctx, link, table, list, option) + var ( + keys []string // Field names. + valueHolder = make([]string, 0) + ) + // Handle the field names and placeholders. + for k := range list[0] { + keys = append(keys, k) + valueHolder = append(valueHolder, "?") + } + // Prepare the batch result pointer. + var ( + charL, charR = d.Core.GetChars() + batchResult = new(gdb.SqlResult) + keysStr = charL + strings.Join(keys, charR+","+charL) + charR + holderStr = strings.Join(valueHolder, ",") + listLength = len(list) + tx = &gdb.TX{} + stdSqlResult sql.Result + stmt *gdb.Stmt + ) + tx, err = d.Core.Begin(ctx) + if err != nil { + return + } + stmt, err = tx.Prepare(fmt.Sprintf( + "INSERT INTO %s(%s) VALUES (%s)", + d.QuotePrefixTableName(table), keysStr, + holderStr, + )) + if err != nil { + return + } + for i := 0; i < listLength; i++ { + params := []interface{}{} // Values that will be committed to underlying database driver. + for _, k := range keys { + params = append(params, list[i][k]) + } + // Prepare is allowed to execute only once in a transaction opened by clickhouse + stdSqlResult, err = stmt.ExecContext(ctx, params...) + if err != nil { + return stdSqlResult, err + } + } + return batchResult, tx.Commit() } // InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE. @@ -246,3 +285,11 @@ func (d *Driver) InsertAndGetId(ctx context.Context, table string, data interfac func (d *Driver) Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) { return nil, ErrUnsupportedReplace } + +func (d *Driver) Begin(ctx context.Context) (tx *gdb.TX, err error) { + return nil, ErrUnsupportedBegin +} + +func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error { + return ErrUnsupportedTransaction +} diff --git a/contrib/drivers/clickhouse/clickhouse_test.go b/contrib/drivers/clickhouse/clickhouse_test.go index 8f867be83..821af332d 100644 --- a/contrib/drivers/clickhouse/clickhouse_test.go +++ b/contrib/drivers/clickhouse/clickhouse_test.go @@ -129,9 +129,9 @@ func TestDriverClickhouse_DoInsertOne(t *testing.T) { defer dropClickhouseTable(connect) _, err := connect.Model("visits").Data(g.Map{ "id": grand.Intn(999), - "duration": grand.Intn(999), - "url": grand.Intn(999), - "created": grand.Intn(999), + "duration": float64(grand.Intn(999)), + "url": gconv.String(grand.Intn(999)), + "created": time.Now().Format("2006-01-02 15:04:05"), }).Insert() gtest.AssertNil(err) } @@ -141,23 +141,13 @@ func TestDriver_DoInsertMany(t *testing.T) { gtest.AssertEQ(createClickhouseTable(connect), nil) defer dropClickhouseTable(connect) tx, err := connect.Begin(context.Background()) - for i := 0; i < 10; i++ { - _, err = tx.Model("visits").Data(g.Map{ - "id": grand.Intn(999), - "duration": float64(grand.Intn(999)), - "url": gconv.String(grand.Intn(999)), - "created": time.Now().Format("2006-01-02 15:04:05"), - }). - Save() - gtest.AssertNil(err) - } - gtest.AssertNil(tx.Commit()) + gtest.AssertEQ(err, ErrUnsupportedBegin) + gtest.AssertNil(tx) } func TestDriverClickhouse_DoInsert(t *testing.T) { connect := InitClickhouse() gtest.AssertEQ(createClickhouseTable(connect), nil) - defer dropClickhouseTable(connect) type insertItem struct { Id int `orm:"id"` Duration float64 `orm:"duration"` @@ -165,27 +155,24 @@ func TestDriverClickhouse_DoInsert(t *testing.T) { Created string `orm:"created"` } var ( - ctx = context.Background() insertUrl = "https://goframe.org" - // insert one data - item = insertItem{ + total = 0 + item = insertItem{ Id: 0, Duration: 1, Url: insertUrl, Created: time.Now().Format("2006-01-02 15:04:05"), } ) - _, err := connect.Model("visits").Ctx(ctx).Data(item).Insert() + _, err := connect.Model("visits").Data(item).Insert() gtest.AssertNil(err) - _, err = connect.Model("visits").Ctx(ctx).Data(item).InsertIgnore() + _, err = connect.Model("visits").Data(item).Save() gtest.AssertNil(err) - - _, err = connect.Model("visits").Ctx(ctx).Data(item).InsertAndGetId() - _, err = connect.Model("visits").Ctx(ctx).Data(item).Save() + total, err = connect.Model("visits").Count() gtest.AssertNil(err) - // insert array data + gtest.AssertEQ(total, 2) list := []*insertItem{} - for i := 0; i < 999; i++ { + for i := 0; i < 50; i++ { list = append(list, &insertItem{ Id: grand.Intn(999), Duration: float64(grand.Intn(999)), @@ -193,13 +180,14 @@ func TestDriverClickhouse_DoInsert(t *testing.T) { Created: time.Now().Format("2006-01-02 15:04:05"), }) } - _, err = connect.Model("visits").Ctx(ctx).Data(list).Insert() + _, err = connect.Model("visits").Data(list).Insert() gtest.AssertNil(err) - _, err = connect.Model("visits").Ctx(ctx).Data(list).InsertIgnore() + _, err = connect.Model("visits").Data(list).Save() gtest.AssertNil(err) - _, err = connect.Model("visits").Ctx(ctx).Data(list).InsertAndGetId() - _, err = connect.Model("visits").Ctx(ctx).Data(list).Save() + total, err = connect.Model("visits").Count() gtest.AssertNil(err) + gtest.AssertEQ(total, 102) + dropClickhouseTable(connect) } func TestDriverClickhouse_DoExec(t *testing.T) { diff --git a/database/gdb/gdb.go b/database/gdb/gdb.go index ecbd8cfab..85b8f0e7d 100644 --- a/database/gdb/gdb.go +++ b/database/gdb/gdb.go @@ -242,9 +242,8 @@ type Sql struct { type DoInsertOption struct { OnDuplicateStr string OnDuplicateMap map[string]interface{} - InsertOption int // Insert operation. - BatchCount int // Batch count for batch inserting. - IsIgnoreResult bool // IgnoreResult + InsertOption int // Insert operation. + BatchCount int // Batch count for batch inserting. } // TableField is the struct for table field. diff --git a/database/gdb/gdb_core.go b/database/gdb/gdb_core.go index e92e1be94..3290d2ec2 100644 --- a/database/gdb/gdb_core.go +++ b/database/gdb/gdb_core.go @@ -416,7 +416,7 @@ func (c *Core) DoInsert(ctx context.Context, link Link, table string, list List, operation = GetInsertOperationByOption(option.InsertOption) ) if option.InsertOption == InsertOptionSave { - onDuplicateStr = c.formatOnDuplicate(keys, option) + onDuplicateStr = c.FormatOnDuplicate(keys, option) } var ( listLength = len(list) @@ -450,14 +450,12 @@ func (c *Core) DoInsert(ctx context.Context, link Link, table string, list List, if err != nil { return stdSqlResult, err } - if !option.IsIgnoreResult { - if affectedRows, err = stdSqlResult.RowsAffected(); err != nil { - err = gerror.WrapCode(gcode.CodeDbOperationError, err, `sql.Result.RowsAffected failed`) - return stdSqlResult, err - } else { - batchResult.Result = stdSqlResult - batchResult.Affected += affectedRows - } + if affectedRows, err = stdSqlResult.RowsAffected(); err != nil { + err = gerror.WrapCode(gcode.CodeDbOperationError, err, `sql.Result.RowsAffected failed`) + return stdSqlResult, err + } else { + batchResult.Result = stdSqlResult + batchResult.Affected += affectedRows } params = params[:0] valueHolder = valueHolder[:0] @@ -466,7 +464,7 @@ func (c *Core) DoInsert(ctx context.Context, link Link, table string, list List, return batchResult, nil } -func (c *Core) formatOnDuplicate(columns []string, option DoInsertOption) string { +func (c *Core) FormatOnDuplicate(columns []string, option DoInsertOption) string { var onDuplicateStr string if option.OnDuplicateStr != "" { onDuplicateStr = option.OnDuplicateStr