新增动态大小的安全队列gqueue,并修正grpool关闭时的数量问题

This commit is contained in:
John
2018-01-20 11:09:27 +08:00
parent 70ff9c1ef0
commit 61c067c661
12 changed files with 327 additions and 16 deletions

View File

@ -0,0 +1,54 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gqueue
import (
"math"
"sync"
"container/list"
)
type IntQueue struct {
mu sync.RWMutex
list *list.List
events chan struct{}
}
func NewIntQueue() *IntQueue {
return &IntQueue{
list : list.New(),
events : make(chan struct{}, math.MaxInt64),
}
}
// 将数据压入队列
func (q *IntQueue) Push(v int) {
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
q.events <- struct{}{}
}
// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *IntQueue) Pop() int {
select {
case <- q.events:
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem).(int)
q.mu.Unlock()
return item
}
q.mu.Unlock()
}
return 0
}
// 获取当前队列大小
func (q *IntQueue) Size() int {
return len(q.events)
}

View File

@ -0,0 +1,55 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 动态大小的安全队列(dynamic channel).
package gqueue
import (
"math"
"sync"
"container/list"
)
type InterfaceQueue struct {
mu sync.RWMutex
list *list.List
events chan struct{}
}
func NewInterfaceQueue() *InterfaceQueue {
return &InterfaceQueue {
list : list.New(),
events : make(chan struct{}, math.MaxInt64),
}
}
// 将数据压入队列
func (q *InterfaceQueue) Push(v interface{}) {
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
q.events <- struct{}{}
}
// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *InterfaceQueue) Pop() interface{} {
select {
case <- q.events:
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
}
return nil
}
// 获取当前队列大小
func (q *InterfaceQueue) Size() int {
return len(q.events)
}

View File

@ -0,0 +1,54 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gqueue
import (
"math"
"sync"
"container/list"
)
type StringQueue struct {
mu sync.RWMutex
list *list.List
events chan struct{}
}
func NewStringQueue() *StringQueue {
return &StringQueue{
list : list.New(),
events : make(chan struct{}, math.MaxInt64),
}
}
// 将数据压入队列
func (q *StringQueue) Push(v string) {
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
q.events <- struct{}{}
}
// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *StringQueue) Pop() string {
select {
case <- q.events:
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem).(string)
q.mu.Unlock()
return item
}
q.mu.Unlock()
}
return ""
}
// 获取当前队列大小
func (q *StringQueue) Size() int {
return len(q.events)
}

View File

@ -0,0 +1,54 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gqueue
import (
"math"
"sync"
"container/list"
)
type UintQueue struct {
mu sync.RWMutex
list *list.List
events chan struct{}
}
func NewUintQueue() *UintQueue {
return &UintQueue{
list : list.New(),
events : make(chan struct{}, math.MaxInt64),
}
}
// 将数据压入队列
func (q *UintQueue) Push(v uint) {
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
q.events <- struct{}{}
}
// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *UintQueue) Pop() uint {
select {
case <- q.events:
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem).(uint)
q.mu.Unlock()
return item
}
q.mu.Unlock()
}
return 0
}
// 获取当前队列大小
func (q *UintQueue) Size() int {
return len(q.events)
}

View File

@ -16,6 +16,7 @@ import (
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gxml"
"gitee.com/johng/gf/g/encoding/gyaml"
"gitee.com/johng/gf/g/encoding/gtoml"
)
// json解析结果存放数组
@ -65,25 +66,32 @@ func Load (path string) (*Json, error) {
return LoadContent(data, gfile.Ext(path))
}
// 支持的配置文件格式xml, json, yml
// 支持的配置文件格式xml, json, yaml/yml, toml
func LoadContent (data []byte, t string) (*Json, error) {
var err error
var result interface{}
switch t {
case "xml": fallthrough
case "xml": fallthrough
case ".xml":
data, err = gxml.ToJson(data)
if err != nil {
return nil, err
}
case "yml": fallthrough
case "yaml": fallthrough
case ".yml": fallthrough
case "yml": fallthrough
case "yaml": fallthrough
case ".yml": fallthrough
case ".yaml":
data, err = gyaml.ToJson(data)
if err != nil {
return nil, err
}
case "toml": fallthrough
case ".toml":
data, err = gtoml.ToJson(data)
if err != nil {
return nil, err
}
}
if err := json.Unmarshal(data, &result); err != nil {
return nil, err
@ -252,6 +260,10 @@ func (p *Json) ToYaml() ([]byte, error) {
return gyaml.Encode(*(p.value))
}
func (p *Json) ToToml() ([]byte, error) {
return gtoml.Encode(*(p.value))
}
// 判断所给字符串是否为数字
func isNumeric(s string) bool {
for i := 0; i < len(s); i++ {

42
g/encoding/gtoml/gtoml.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// TOML
package gtoml
import (
"bytes"
"encoding/json"
"github.com/BurntSushi/toml"
)
func Encode(v interface{}) ([]byte, error) {
buffer := bytes.NewBuffer(nil)
if err := toml.NewEncoder(buffer).Encode(v); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func Decode(v []byte) (interface{}, error) {
var result interface{}
if err := toml.Unmarshal(v, &result); err != nil {
return nil, err
}
return result, nil
}
func DecodeTo(v []byte, result interface{}) error {
return toml.Unmarshal(v, result)
}
func ToJson(v []byte) ([]byte, error) {
if r, err := Decode(v); err != nil {
return nil, err
} else {
return json.Marshal(r)
}
}

View File

@ -13,9 +13,12 @@ func Encode(v interface{}) ([]byte, error) {
return yaml.Marshal(v)
}
func Decode(v []byte) error {
func Decode(v []byte) (interface{}, error) {
var result interface{}
return yaml.Unmarshal(v, &result)
if err := yaml.Unmarshal(v, &result); err != nil {
return nil, err
}
return result, nil
}
func DecodeTo(v []byte, result interface{}) error {

View File

@ -4,18 +4,18 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 单例对象管理.
// 框架内置了一些核心对象并且可以通过Set和Get方法实现IoC以及对内置核心对象的自定义替换
// 单例对象管理(耦合了一些框架核心对象获取方法).
// 框架内置了一些核心对象获取方法并且可以通过Set和Get方法实现IoC以及对内置核心对象的自定义替换
package gins
import (
"strconv"
"gitee.com/johng/gf/g/os/gcfg"
"gitee.com/johng/gf/g/os/gcmd"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/genv"
"gitee.com/johng/gf/g/os/gview"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/frame/gcfg"
"gitee.com/johng/gf/g/database/gdb"
"gitee.com/johng/gf/g/container/gmap"
)

View File

@ -5,7 +5,7 @@
// You can obtain one at https://gitee.com/johng/gf.
// 配置管理.
// 配置文件格式支持json, xml, yml
// 配置文件格式支持json, xml, toml, yaml/yml
package gcfg
import (

View File

@ -56,7 +56,7 @@ func New(expire int, sizes...int) *Pool {
funcs : glist.NewSafeList(),
freeEvents : make(chan struct{}, math.MaxUint32),
funcEvents : make(chan struct{}, math.MaxUint32),
stopEvents : make(chan struct{}, 1),
stopEvents : make(chan struct{}, 2),
}
p.startWorkLoop()
p.startClearLoop()

32
geg/container/gqueue.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/container/gqueue"
"time"
)
func main() {
t := gtime.Microsecond()
q := gqueue.NewInterfaceQueue()
fmt.Println("queue creation costs(μs):", gtime.Microsecond() - t)
// 每隔2秒异步打印出当前队列的大小
gtime.SetInterval(2*time.Second, func() bool {
fmt.Println("queue size:", q.Size())
return true
})
// push10条数据
for i := 0; i < 10; i++ {
q.Push(i)
fmt.Println("push:", i)
}
// 每隔1秒pop1条数据
for {
time.Sleep(time.Second)
fmt.Println(" pop:", q.Pop())
}
}

View File

@ -2,11 +2,16 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/encoding/gjson"
)
func main() {
j, _ := gjson.Load("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/frame/config.json")
c, _ := j.ToXmlIndent("config")
fmt.Println(string(c))
c1 := make(chan int, 2)
c2 := make(chan int, 5)
c1 <- 1
c1 <- 2
c2 = c1
c2 <- 3
fmt.Println(<-c2)
fmt.Println(<-c2)
fmt.Println(<-c2)
}