fix(database/gdb): fix context canceled error in transaction due to usage of TransTimeout configuration (#4037)

This commit is contained in:
John Guo
2024-12-17 21:15:54 +08:00
committed by GitHub
parent 0c2d5cac19
commit f79aef6669
12 changed files with 327 additions and 107 deletions

View File

@ -528,24 +528,53 @@ type dynamicConfig struct {
// DoCommitInput is the input parameters for function DoCommit.
type DoCommitInput struct {
Db *sql.DB
Tx *sql.Tx
Stmt *sql.Stmt
Link Link
Sql string
Args []interface{}
Type SqlType
TxOptions sql.TxOptions
// Db is the underlying database connection object.
Db *sql.DB
// Tx is the underlying transaction object.
Tx *sql.Tx
// Stmt is the prepared statement object.
Stmt *sql.Stmt
// Link is the common database function wrapper interface.
Link Link
// Sql is the SQL string to be executed.
Sql string
// Args is the arguments for SQL placeholders.
Args []interface{}
// Type indicates the type of SQL operation.
Type SqlType
// TxOptions specifies the transaction options.
TxOptions sql.TxOptions
// TxCancelFunc is the context cancel function for transaction.
TxCancelFunc context.CancelFunc
// IsTransaction indicates whether current operation is in transaction.
IsTransaction bool
}
// DoCommitOutput is the output parameters for function DoCommit.
type DoCommitOutput struct {
Result sql.Result // Result is the result of exec statement.
Records []Record // Records is the result of query statement.
Stmt *Stmt // Stmt is the Statement object result for Prepare.
Tx TX // Tx is the transaction object result for Begin.
RawResult interface{} // RawResult is the underlying result, which might be sql.Result/*sql.Rows/*sql.Row.
// Result is the result of exec statement.
Result sql.Result
// Records is the result of query statement.
Records []Record
// Stmt is the Statement object result for Prepare.
Stmt *Stmt
// Tx is the transaction object result for Begin.
Tx TX
// RawResult is the underlying result, which might be sql.Result/*sql.Rows/*sql.Row.
RawResult interface{}
}
// Driver is the interface for integrating sql drivers into package gdb.
@ -581,43 +610,84 @@ type Sql struct {
// DoInsertOption is the input struct for function DoInsert.
type DoInsertOption struct {
OnDuplicateStr string // Custom string for `on duplicated` statement.
OnDuplicateMap map[string]interface{} // Custom key-value map from `OnDuplicateEx` function for `on duplicated` statement.
OnConflict []string // Custom conflict key of upsert clause, if the database needs it.
InsertOption InsertOption // Insert operation in constant value.
BatchCount int // Batch count for batch inserting.
// OnDuplicateStr is the custom string for `on duplicated` statement.
OnDuplicateStr string
// OnDuplicateMap is the custom key-value map from `OnDuplicateEx` function for `on duplicated` statement.
OnDuplicateMap map[string]interface{}
// OnConflict is the custom conflict key of upsert clause, if the database needs it.
OnConflict []string
// InsertOption is the insert operation in constant value.
InsertOption InsertOption
// BatchCount is the batch count for batch inserting.
BatchCount int
}
// TableField is the struct for table field.
type TableField struct {
Index int // For ordering purpose as map is unordered.
Name string // Field name.
Type string // Field type. Eg: 'int(10) unsigned', 'varchar(64)'.
Null bool // Field can be null or not.
Key string // The index information(empty if it's not an index). Eg: PRI, MUL.
Default interface{} // Default value for the field.
Extra string // Extra information. Eg: auto_increment.
Comment string // Field comment.
// Index is for ordering purpose as map is unordered.
Index int
// Name is the field name.
Name string
// Type is the field type. Eg: 'int(10) unsigned', 'varchar(64)'.
Type string
// Null is whether the field can be null or not.
Null bool
// Key is the index information(empty if it's not an index). Eg: PRI, MUL.
Key string
// Default is the default value for the field.
Default interface{}
// Extra is the extra information. Eg: auto_increment.
Extra string
// Comment is the field comment.
Comment string
}
// Counter is the type for update count.
// Counter is the type for update count.
type Counter struct {
// Field is the field name.
Field string
// Value is the value.
Value float64
}
type (
Raw string // Raw is a raw sql that will not be treated as argument but as a direct sql part.
Value = *gvar.Var // Value is the field value type.
Record map[string]Value // Record is the row record of the table.
Result []Record // Result is the row record array.
Map = map[string]interface{} // Map is alias of map[string]interface{}, which is the most common usage map type.
List = []Map // List is type of map array.
// Raw is a raw sql that will not be treated as argument but as a direct sql part.
Raw string
// Value is the field value type.
Value = *gvar.Var
// Record is the row record of the table.
Record map[string]Value
// Result is the row record array.
Result []Record
// Map is alias of map[string]interface{}, which is the most common usage map type.
Map = map[string]interface{}
// List is type of map array.
List = []Map
)
type CatchSQLManager struct {
// SQLArray is the array of sql.
SQLArray *garray.StrArray
DoCommit bool // DoCommit marks it will be committed to underlying driver or not.
// DoCommit marks it will be committed to underlying driver or not.
DoCommit bool
}
const (

View File

@ -92,7 +92,6 @@ func (c *Core) GetCtxTimeout(ctx context.Context, timeoutType ctxTimeoutType) (c
if c.db.GetConfig().PrepareTimeout > 0 {
return context.WithTimeout(ctx, config.PrepareTimeout)
}
case ctxTimeoutTypeTrans:
if c.db.GetConfig().TranTimeout > 0 {
return context.WithTimeout(ctx, config.TranTimeout)

View File

@ -27,36 +27,126 @@ type ConfigGroup []ConfigNode
// ConfigNode is configuration for one node.
type ConfigNode struct {
Host string `json:"host"` // Host of server, ip or domain like: 127.0.0.1, localhost
Port string `json:"port"` // Port, it's commonly 3306.
User string `json:"user"` // Authentication username.
Pass string `json:"pass"` // Authentication password.
Name string `json:"name"` // Default used database name.
Type string `json:"type"` // Database type: mysql, mariadb, sqlite, mssql, pgsql, oracle, clickhouse, dm.
Link string `json:"link"` // (Optional) Custom link information for all configuration in one single string.
Extra string `json:"extra"` // (Optional) Extra configuration according the registered third-party database driver.
Role string `json:"role"` // (Optional, "master" in default) Node role, used for master-slave mode: master, slave.
Debug bool `json:"debug"` // (Optional) Debug mode enables debug information logging and output.
Prefix string `json:"prefix"` // (Optional) Table prefix.
DryRun bool `json:"dryRun"` // (Optional) Dry run, which does SELECT but no INSERT/UPDATE/DELETE statements.
Weight int `json:"weight"` // (Optional) Weight for load balance calculating, it's useless if there's just one node.
Charset string `json:"charset"` // (Optional, "utf8" in default) Custom charset when operating on database.
Protocol string `json:"protocol"` // (Optional, "tcp" in default) See net.Dial for more information which networks are available.
Timezone string `json:"timezone"` // (Optional) Sets the time zone for displaying and interpreting time stamps.
Namespace string `json:"namespace"` // (Optional) Namespace for some databases. Eg, in pgsql, the `Name` acts as the `catalog`, the `NameSpace` acts as the `schema`.
MaxIdleConnCount int `json:"maxIdle"` // (Optional) Max idle connection configuration for underlying connection pool.
MaxOpenConnCount int `json:"maxOpen"` // (Optional) Max open connection configuration for underlying connection pool.
MaxConnLifeTime time.Duration `json:"maxLifeTime"` // (Optional) Max amount of time a connection may be idle before being closed.
QueryTimeout time.Duration `json:"queryTimeout"` // (Optional) Max query time for per dql.
ExecTimeout time.Duration `json:"execTimeout"` // (Optional) Max exec time for dml.
TranTimeout time.Duration `json:"tranTimeout"` // (Optional) Max exec time for a transaction.
PrepareTimeout time.Duration `json:"prepareTimeout"` // (Optional) Max exec time for prepare operation.
CreatedAt string `json:"createdAt"` // (Optional) The field name of table for automatic-filled created datetime.
UpdatedAt string `json:"updatedAt"` // (Optional) The field name of table for automatic-filled updated datetime.
DeletedAt string `json:"deletedAt"` // (Optional) The field name of table for automatic-filled updated datetime.
TimeMaintainDisabled bool `json:"timeMaintainDisabled"` // (Optional) Disable the automatic time maintaining feature.
// Host specifies the server address, can be either IP address or domain name
// Example: "127.0.0.1", "localhost"
Host string `json:"host"`
// Port specifies the server port number
// Default is typically "3306" for MySQL
Port string `json:"port"`
// User specifies the authentication username for database connection
User string `json:"user"`
// Pass specifies the authentication password for database connection
Pass string `json:"pass"`
// Name specifies the default database name to be used
Name string `json:"name"`
// Type specifies the database type
// Example: mysql, mariadb, sqlite, mssql, pgsql, oracle, clickhouse, dm.
Type string `json:"type"`
// Link provides custom connection string that combines all configuration in one string
// Optional field
Link string `json:"link"`
// Extra provides additional configuration options for third-party database drivers
// Optional field
Extra string `json:"extra"`
// Role specifies the node role in master-slave setup
// Optional field, defaults to "master"
// Available values: "master", "slave"
Role Role `json:"role"`
// Debug enables debug mode for logging and output
// Optional field
Debug bool `json:"debug"`
// Prefix specifies the table name prefix
// Optional field
Prefix string `json:"prefix"`
// DryRun enables simulation mode where SELECT statements are executed
// but INSERT/UPDATE/DELETE statements are not
// Optional field
DryRun bool `json:"dryRun"`
// Weight specifies the node weight for load balancing calculations
// Optional field, only effective in multi-node setups
Weight int `json:"weight"`
// Charset specifies the character set for database operations
// Optional field, defaults to "utf8"
Charset string `json:"charset"`
// Protocol specifies the network protocol for database connection
// Optional field, defaults to "tcp"
// See net.Dial for available network protocols
Protocol string `json:"protocol"`
// Timezone sets the time zone for timestamp interpretation and display
// Optional field
Timezone string `json:"timezone"`
// Namespace specifies the schema namespace for certain databases
// Optional field, e.g., in PostgreSQL, Name is the catalog and Namespace is the schema
Namespace string `json:"namespace"`
// MaxIdleConnCount specifies the maximum number of idle connections in the pool
// Optional field
MaxIdleConnCount int `json:"maxIdle"`
// MaxOpenConnCount specifies the maximum number of open connections in the pool
// Optional field
MaxOpenConnCount int `json:"maxOpen"`
// MaxConnLifeTime specifies the maximum lifetime of a connection
// Optional field
MaxConnLifeTime time.Duration `json:"maxLifeTime"`
// QueryTimeout specifies the maximum execution time for DQL operations
// Optional field
QueryTimeout time.Duration `json:"queryTimeout"`
// ExecTimeout specifies the maximum execution time for DML operations
// Optional field
ExecTimeout time.Duration `json:"execTimeout"`
// TranTimeout specifies the maximum execution time for a transaction block
// Optional field
TranTimeout time.Duration `json:"tranTimeout"`
// PrepareTimeout specifies the maximum execution time for prepare operations
// Optional field
PrepareTimeout time.Duration `json:"prepareTimeout"`
// CreatedAt specifies the field name for automatic timestamp on record creation
// Optional field
CreatedAt string `json:"createdAt"`
// UpdatedAt specifies the field name for automatic timestamp on record updates
// Optional field
UpdatedAt string `json:"updatedAt"`
// DeletedAt specifies the field name for automatic timestamp on record deletion
// Optional field
DeletedAt string `json:"deletedAt"`
// TimeMaintainDisabled controls whether automatic time maintenance is disabled
// Optional field
TimeMaintainDisabled bool `json:"timeMaintainDisabled"`
}
type Role string
const (
RoleMaster Role = "master"
RoleSlave Role = "slave"
)
const (
DefaultGroupName = "default" // Default group name.
)

View File

@ -20,13 +20,30 @@ import (
// TXCore is the struct for transaction management.
type TXCore struct {
db DB // db is the current gdb database manager.
tx *sql.Tx // tx is the raw and underlying transaction manager.
ctx context.Context // ctx is the context for this transaction only.
master *sql.DB // master is the raw and underlying database manager.
transactionId string // transactionId is a unique id generated by this object for this transaction.
transactionCount int // transactionCount marks the times that Begins.
isClosed bool // isClosed marks this transaction has already been committed or rolled back.
// db is the database management interface that implements the DB interface,
// providing access to database operations and configuration.
db DB
// tx is the underlying SQL transaction object from database/sql package,
// which manages the actual transaction operations.
tx *sql.Tx
// ctx is the context specific to this transaction,
// which can be used for timeout control and cancellation.
ctx context.Context
// master is the underlying master database connection pool,
// used for direct database operations when needed.
master *sql.DB
// transactionId is a unique identifier for this transaction instance,
// used for tracking and debugging purposes.
transactionId string
// transactionCount tracks the number of nested transaction begins,
// used for managing transaction nesting depth.
transactionCount int
// isClosed indicates whether this transaction has been finalized
// through either a commit or rollback operation.
isClosed bool
// cancelFunc is the context cancellation function associated with ctx,
// used to cancel the transaction context when needed.
cancelFunc context.CancelFunc
}
// transactionKeyForNestedPoint forms and returns the transaction key at current save point.
@ -73,6 +90,7 @@ func (tx *TXCore) Commit() error {
Tx: tx.tx,
Sql: "COMMIT",
Type: SqlTypeTXCommit,
TxCancelFunc: tx.cancelFunc,
IsTransaction: true,
})
if err == nil {
@ -94,6 +112,7 @@ func (tx *TXCore) Rollback() error {
Tx: tx.tx,
Sql: "ROLLBACK",
Type: SqlTypeTXRollback,
TxCancelFunc: tx.cancelFunc,
IsTransaction: true,
})
if err == nil {

View File

@ -51,12 +51,6 @@ func (c *Core) DoQuery(ctx context.Context, link Link, sql string, args ...inter
}
}
if c.db.GetConfig().QueryTimeout > 0 {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithTimeout(ctx, c.db.GetConfig().QueryTimeout)
defer cancelFunc()
}
// Sql filtering.
sql, args = c.FormatSqlBeforeExecuting(sql, args)
sql, args, err = c.db.DoFilter(ctx, link, sql, args)
@ -115,12 +109,6 @@ func (c *Core) DoExec(ctx context.Context, link Link, sql string, args ...interf
}
}
if c.db.GetConfig().ExecTimeout > 0 {
var cancelFunc context.CancelFunc
ctx, cancelFunc = context.WithTimeout(ctx, c.db.GetConfig().ExecTimeout)
defer cancelFunc()
}
// SQL filtering.
sql, args = c.FormatSqlBeforeExecuting(sql, args)
sql, args, err = c.db.DoFilter(ctx, link, sql, args)
@ -183,11 +171,10 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
ctx, span := tr.Start(ctx, string(in.Type), trace.WithSpanKind(trace.SpanKindInternal))
defer span.End()
// Execution cased by type.
// Execution by type.
switch in.Type {
case SqlTypeBegin:
ctx, cancelFuncForTimeout = c.GetCtxTimeout(ctx, ctxTimeoutTypeTrans)
defer cancelFuncForTimeout()
formattedSql = fmt.Sprintf(
`%s (IosolationLevel: %s, ReadOnly: %t)`,
formattedSql, in.TxOptions.Isolation.String(), in.TxOptions.ReadOnly,
@ -199,15 +186,22 @@ func (c *Core) DoCommit(ctx context.Context, in DoCommitInput) (out DoCommitOutp
ctx: context.WithValue(ctx, transactionIdForLoggerCtx, transactionIdGenerator.Add(1)),
master: in.Db,
transactionId: guid.S(),
cancelFunc: cancelFuncForTimeout,
}
ctx = out.Tx.GetCtx()
}
out.RawResult = sqlTx
case SqlTypeTXCommit:
if in.TxCancelFunc != nil {
defer in.TxCancelFunc()
}
err = in.Tx.Commit()
case SqlTypeTXRollback:
if in.TxCancelFunc != nil {
defer in.TxCancelFunc()
}
err = in.Tx.Rollback()
case SqlTypeExecContext: