修复gkafka异步发送消息问题,改进gfile常量管理

This commit is contained in:
john
2018-10-23 16:56:25 +08:00
parent 695140a1ab
commit 1028ffa49f
4 changed files with 33 additions and 27 deletions

View File

@ -8,6 +8,7 @@
package gkafka
import (
"gitee.com/johng/gf/g/os/glog"
"time"
"strings"
"gitee.com/johng/gf/third/github.com/Shopify/sarama"
@ -207,19 +208,19 @@ func (client *Client) AsyncSend(message *Message) error {
return err
} else {
client.asyncProducer = p
//go func(p sarama.AsyncProducer) {
// errors := p.Errors()
// success := p.Successes()
// for {
// select {
// case err := <-errors:
// if err != nil {
// glog.Error(err)
// }
// case <-success:
// }
// }
//}(client.asyncProducer)
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
glog.Error(err)
}
case <-success:
}
}
}(client.asyncProducer)
}
}

View File

@ -27,7 +27,9 @@ import (
// 文件分隔符
const (
Separator = string(filepath.Separator)
Separator = string(filepath.Separator)
// 默认的文件打开权限
gDEFAULT_PERM = 0666
)
var (
@ -63,7 +65,7 @@ func Create(path string) error {
// 打开文件(os.O_RDWR|os.O_CREATE, 0666)
func Open(path string) (*os.File, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, gDEFAULT_PERM)
if err != nil {
return nil, err
}
@ -72,7 +74,7 @@ func Open(path string) (*os.File, error) {
// 打开文件(带flag)
func OpenWithFlag(path string, flag int) (*os.File, error) {
f, err := os.OpenFile(path, flag, 0666)
f, err := os.OpenFile(path, flag, gDEFAULT_PERM)
if err != nil {
return nil, err
}
@ -201,7 +203,7 @@ func Remove(path string) error {
// 文件是否可读
func IsReadable(path string) bool {
result := true
file, err := os.OpenFile(path, os.O_RDONLY, 0666)
file, err := os.OpenFile(path, os.O_RDONLY, gDEFAULT_PERM)
if err != nil {
result = false
}
@ -223,7 +225,7 @@ func IsWritable(path string) bool {
}
} else {
// 如果是文件,那么判断文件是否可打开
file, err := os.OpenFile(path, os.O_WRONLY, 0666)
file, err := os.OpenFile(path, os.O_WRONLY, gDEFAULT_PERM)
if err != nil {
result = false
}

View File

@ -44,7 +44,7 @@ func putContents(path string, data []byte, flag int, perm os.FileMode) error {
}
}
// 创建/打开文件使用文件指针池默认60秒
f, err := gfpool.OpenFile(path, flag, perm, 60000)
f, err := gfpool.OpenFile(path, flag, perm, gFILE_POOL_EXPIRE*1000)
if err != nil {
return err
}
@ -64,22 +64,22 @@ func Truncate(path string, size int) error {
// (文本)写入文件内容
func PutContents(path string, content string) error {
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, gDEFAULT_PERM)
}
// (文本)追加内容到文件末尾
func PutContentsAppend(path string, content string) error {
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_APPEND, gDEFAULT_PERM)
}
// (二进制)写入文件内容
func PutBinContents(path string, content []byte) error {
return putContents(path, content, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
return putContents(path, content, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, gDEFAULT_PERM)
}
// (二进制)追加内容到文件末尾
func PutBinContentsAppend(path string, content []byte) error {
return putContents(path, content, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
return putContents(path, content, os.O_WRONLY|os.O_CREATE|os.O_APPEND, gDEFAULT_PERM)
}
// 获得文件内容下一个指定字节的位置
@ -103,7 +103,7 @@ func GetNextCharOffset(file *os.File, char byte, start int64) int64 {
// 获得文件内容下一个指定字节的位置
func GetNextCharOffsetByPath(path string, char byte, start int64) int64 {
if f, err := gfpool.Open(path, os.O_RDONLY, 0666, gFILE_POOL_EXPIRE*1000); err == nil {
if f, err := gfpool.Open(path, os.O_RDONLY, gDEFAULT_PERM, gFILE_POOL_EXPIRE*1000); err == nil {
defer f.Close()
return GetNextCharOffset(&f.File, char, start)
} else {
@ -122,7 +122,7 @@ func GetBinContentsTilChar(file *os.File, char byte, start int64) ([]byte, int64
// 获得文件内容直到下一个指定字节的位置(返回值包含该位置字符内容)
func GetBinContentsTilCharByPath(path string, char byte, start int64) ([]byte, int64) {
if f, err := gfpool.Open(path, os.O_RDONLY, 0666, gFILE_POOL_EXPIRE*1000); err == nil {
if f, err := gfpool.Open(path, os.O_RDONLY, gDEFAULT_PERM, gFILE_POOL_EXPIRE*1000); err == nil {
defer f.Close()
return GetBinContentsTilChar(&f.File, char, start)
} else {
@ -142,7 +142,7 @@ func GetBinContentsByTwoOffsets(file *os.File, start int64, end int64) []byte {
// 获得文件内容中两个offset之间的内容 [start, end)
func GetBinContentsByTwoOffsetsByPath(path string, start int64, end int64) []byte {
if f, err := gfpool.Open(path, os.O_RDONLY, 0666, gFILE_POOL_EXPIRE*1000); err == nil {
if f, err := gfpool.Open(path, os.O_RDONLY, gDEFAULT_PERM, gFILE_POOL_EXPIRE*1000); err == nil {
defer f.Close()
return GetBinContentsByTwoOffsets(&f.File, start, end)
} else {

View File

@ -2,6 +2,7 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/container/gvar"
)
func test() {
@ -10,5 +11,7 @@ func test() {
}
func main() {
test()
var v *gvar.Var
//v := new(gvar.Var)
fmt.Println(v.String())
}