add package gkvdb; add kvdb storage feature for ghttp.Server

This commit is contained in:
John
2019-08-22 21:04:30 +08:00
parent 51a156420d
commit 7dffd9d1ff
20 changed files with 795 additions and 113 deletions

50
database/gkvdb/gkvdb.go Normal file
View File

@ -0,0 +1,50 @@
// Copyright 2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Package gkvdb provides a lightweight, embeddable and persistent key-value database.
package gkvdb
import (
"github.com/dgraph-io/badger"
"github.com/gogf/gf/container/gmap"
)
type Options = badger.Options
const (
// Default instance name.
DEFAULT_NAME = "default"
)
var (
// Instance map
instances = gmap.NewStrAnyMap(true)
)
// DefaultOptions returns the default options for gkvdb.
func DefaultOptions(path string) Options {
options := badger.DefaultOptions(path)
options.Logger = nil
options.SyncWrites = false
return options
}
// Instance returns an instance of DB with specified name.
// The <name> param is unnecessary, if <name> is not passed,
// it returns a instance with default name.
func Instance(name ...string) *DB {
instanceName := DEFAULT_NAME
if len(name) > 0 && name[0] != "" {
instanceName = name[0]
}
v := instances.GetOrSetFuncLock(instanceName, func() interface{} {
return New()
})
if v != nil {
return v.(*DB)
}
return nil
}

149
database/gkvdb/gkvdb_db.go Normal file
View File

@ -0,0 +1,149 @@
// Copyright 2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gkvdb
import (
"errors"
"time"
"github.com/gogf/gf/os/gfile"
"github.com/dgraph-io/badger"
)
type DB struct {
options Options
badger *badger.DB
}
// New creates and returns a new db object.
func New(options ...Options) *DB {
if len(options) > 0 {
return &DB{options: options[0]}
}
return &DB{options: DefaultOptions("")}
}
// init does lazy initialization for db.
func (db *DB) init() (err error) {
if db.badger != nil {
return nil
}
if !gfile.Exists(db.options.Dir) {
err = gfile.Mkdir(db.options.Dir)
if err != nil {
return
}
}
db.badger, err = badger.Open(db.options)
return
}
// Options return the options of current db object.
func (db *DB) Options() *Options {
return &db.options
}
// SetOptions sets the options for db.
func (db *DB) SetOptions(options Options) error {
if db.badger != nil {
return errors.New("options cannot be changed after db is initialized")
}
db.options = options
return nil
}
// SetPath sets the storage folder path for db.
func (db *DB) SetPath(path string) error {
if db.badger != nil {
return errors.New("options cannot be changed after db is initialized")
}
db.options.Dir = path
db.options.ValueDir = path
return nil
}
// Size returns the data count of current db.
func (db *DB) Size() int64 {
if err := db.init(); err != nil {
return 0
}
lsm, vlog := db.badger.Size()
return lsm + vlog
}
// Set sets <key>-<value> pair data to current db with <ttl>.
// The <ttl> is optional, which is not expired in default.
func (db *DB) Set(key []byte, value []byte, ttl ...time.Duration) (err error) {
if err := db.init(); err != nil {
return err
}
tx := db.Begin(true)
defer tx.Commit()
return tx.Set(key, value, ttl...)
}
// Get returns the value with given key.
// It returns nil if <key> is not found in the db.
func (db *DB) Get(key []byte) (value []byte) {
if err := db.init(); err != nil {
return
}
tx := db.Begin(false)
defer tx.Rollback()
return tx.Get(key)
}
// Delete removed data specified by <key> from current db.
func (db *DB) Delete(key []byte) error {
if err := db.init(); err != nil {
return err
}
tx := db.Begin(true)
defer tx.Commit()
return tx.Delete(key)
}
// Close closes the db.
func (db *DB) Close() error {
if db.badger == nil {
return nil
}
return db.badger.Close()
}
// Iterate is alias of IterateAsc.
// See IterateAsc.
func (db *DB) Iterate(prefix []byte, f func(key, value []byte) bool) {
db.IterateAsc(prefix, f)
}
// IteratorAsc iterates the db in ascending order
// with given callback function <f> starting from <seek>.
// If <seek> is nil it iterates from the beginning of the db.
// If <f> returns true, then it continues iterating; or false to stop.
func (db *DB) IterateAsc(prefix []byte, f func(key, value []byte) bool) {
if err := db.init(); err != nil {
return
}
tx := db.Begin(false)
defer tx.Rollback()
tx.IterateAsc(prefix, f)
}
// IteratorDesc iterates the db in descending order
// with given callback function <f> starting from <seek>.
// If <prefix> is nil it iterates from the beginning of the db.
// If <f> returns true, then it continues iterating; or false to stop.
func (db *DB) IterateDesc(prefix []byte, f func(key, value []byte) bool) {
if err := db.init(); err != nil {
return
}
tx := db.Begin(false)
defer tx.Rollback()
tx.IterateDesc(prefix, f)
}

116
database/gkvdb/gkvdb_tx.go Normal file
View File

@ -0,0 +1,116 @@
// Copyright 2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gkvdb
import (
"time"
"github.com/dgraph-io/badger"
)
// TX is the transaction object for db.
type TX struct {
db *DB
txn *badger.Txn
}
// Begin starts the transaction for db.
func (db *DB) Begin(update bool) *TX {
if err := db.init(); err != nil {
return nil
}
return &TX{
db: db,
txn: db.badger.NewTransaction(update),
}
}
// Commit commits the changes and close the transaction.
func (tx *TX) Commit() error {
return tx.txn.Commit()
}
// Rollback discards the changes and close the transaction.
func (tx *TX) Rollback() {
tx.txn.Discard()
}
// Set sets <key>-<value> pair data to current db with <ttl> in this transaction.
// The <ttl> is optional, which is not expired in default.
func (tx *TX) Set(key []byte, value []byte, ttl ...time.Duration) error {
if len(ttl) > 0 && ttl[0] > 0 {
return tx.txn.SetEntry(badger.NewEntry(key, value).WithTTL(ttl[0]))
}
return tx.txn.Set(key, value)
}
// Get returns the value with given key in this transaction.
// It returns nil if <key> is not found in the db.
func (tx *TX) Get(key []byte) (value []byte) {
item, err := tx.txn.Get(key)
if err != nil {
return nil
}
if item.IsDeletedOrExpired() {
return nil
}
value, _ = item.ValueCopy(nil)
return
}
// Delete removed data specified by <key> from current db in this transaction.
func (tx *TX) Delete(key []byte) error {
return tx.txn.Delete(key)
}
// Iterate is alias of IterateAsc.
// See IterateAsc.
func (tx *TX) Iterate(prefix []byte, f func(key, value []byte) bool) {
tx.IterateAsc(prefix, f)
}
// IteratorAsc iterates the db in ascending order
// with given callback function <f> starting from <prefix>.
// If <seek> is nil it iterates from the beginning of the db.
// If <f> returns true, then it continues iterating; or false to stop.
func (tx *TX) IterateAsc(prefix []byte, f func(key, value []byte) bool) {
tx.doIterate(false, prefix, f)
}
// IteratorDesc iterates the db in descending order
// with given callback function <f> starting from <prefix>.
// If <seek> is nil it iterates from the beginning of the db.
// If <f> returns true, then it continues iterating; or false to stop.
func (tx *TX) IterateDesc(prefix []byte, f func(key, value []byte) bool) {
tx.doIterate(true, prefix, f)
}
func (tx *TX) doIterate(reverse bool, prefix []byte, f func(key, value []byte) bool) {
options := badger.DefaultIteratorOptions
if prefix != nil {
options.Prefix = prefix
}
options.Reverse = reverse
options.PrefetchSize = 10
options.PrefetchValues = true
it := tx.txn.NewIterator(options)
defer it.Close()
var k, v []byte
var err error
var item *badger.Item
for it.Rewind(); it.Valid(); it.Next() {
item = it.Item()
k = item.Key()
v, err = item.ValueCopy(nil)
if err != nil {
return
}
if !f(k, v) {
return
}
}
}

View File

@ -0,0 +1,114 @@
// Copyright 2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gkvdb_test
import (
"strings"
"testing"
"time"
"github.com/gogf/gf/frame/g"
"github.com/gogf/gf/container/garray"
"github.com/gogf/gf/os/gtime"
"github.com/gogf/gf/util/gconv"
"github.com/gogf/gf/database/gkvdb"
"github.com/gogf/gf/test/gtest"
)
func Test_New(t *testing.T) {
gtest.Case(t, func() {
path := "/tmp/gkvdb/" + gconv.String(gtime.Nanosecond())
key := []byte("key")
value := []byte("value")
db := gkvdb.Instance()
db.SetPath(path)
err := db.Set(key, value)
gtest.Assert(err, nil)
gtest.Assert(db.Get(key), value)
gtest.Assert(db.Delete(key), nil)
gtest.Assert(db.Get(key), nil)
})
}
func Test_Set(t *testing.T) {
gtest.Case(t, func() {
path := "/tmp/gkvdb/" + gconv.String(gtime.Nanosecond())
key := []byte("key")
value := []byte("value")
db := gkvdb.Instance()
db.SetPath(path)
err := db.Set(key, value, 100*time.Millisecond)
gtest.Assert(err, nil)
gtest.Assert(db.Get(key), value)
time.Sleep(200 * time.Millisecond)
gtest.Assert(db.Get(key), nil)
})
}
func Test_Iterate(t *testing.T) {
gtest.Case(t, func() {
path := "/tmp/gkvdb/" + gconv.String(gtime.Nanosecond())
db := gkvdb.Instance()
db.SetPath(path)
strArray := garray.NewSortedStringArray()
strArrayReverse := garray.NewSortedStringArrayComparator(func(a, b string) int {
switch strings.Compare(a, b) {
case 0:
return 0
case 1:
return -1
case -1:
return 1
}
return 0
})
for i := 1; i <= 10; i++ {
key := []byte("key_" + gconv.String(i))
strArray.Add(string(key))
strArrayReverse.Add(string(key))
db.Set(key, key)
}
array := garray.New()
db.Iterate(nil, func(key, value []byte) bool {
array.Append(string(key))
return true
})
gtest.Assert(array.Slice(), strArray.Slice())
array = garray.New()
db.IterateDesc(nil, func(key, value []byte) bool {
array.Append(string(key))
return true
})
gtest.Assert(array.Slice(), strArrayReverse.Slice())
array = garray.New()
db.Iterate([]byte("key_1"), func(key, value []byte) bool {
array.Append(key)
return true
})
gtest.Assert(array.Slice(), g.Slice{[]byte("key_1"), []byte("key_10")})
array = garray.New()
db.IterateAsc([]byte("key_1"), func(key, value []byte) bool {
array.Append(key)
return true
})
gtest.Assert(array.Slice(), g.Slice{[]byte("key_1"), []byte("key_10")})
})
}

View File

@ -22,11 +22,6 @@ import (
"github.com/gomodule/redigo/redis"
)
const (
gDEFAULT_POOL_IDLE_TIMEOUT = 60 * time.Second
gDEFAULT_POOL_MAX_LIFE_TIME = 60 * time.Second
)
// Redis client.
type Redis struct {
pool *redis.Pool // Underlying connection pool.
@ -56,9 +51,12 @@ type PoolStats struct {
redis.PoolStats
}
const (
gDEFAULT_POOL_IDLE_TIMEOUT = 60 * time.Second
gDEFAULT_POOL_MAX_LIFE_TIME = 60 * time.Second
)
var (
// Instance map
instances = gmap.NewStrAnyMap(true)
// Pool map.
pools = gmap.NewStrAnyMap(true)
)
@ -106,28 +104,6 @@ func New(config Config) *Redis {
}
}
// Instance returns an instance of redis client with specified group.
// The <group> param is unnecessary, if <group> is not passed,
// it returns a redis instance with default group.
func Instance(name ...string) *Redis {
group := DEFAULT_GROUP_NAME
if len(name) > 0 && name[0] != "" {
group = name[0]
}
v := instances.GetOrSetFuncLock(group, func() interface{} {
if config, ok := GetConfig(group); ok {
r := New(config)
r.group = group
return r
}
return nil
})
if v != nil {
return v.(*Redis)
}
return nil
}
// Close closes the redis connection pool,
// it will release all connections reserved by this pool.
// It is not necessary to call Close manually.

View File

@ -0,0 +1,36 @@
// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gredis
import "github.com/gogf/gf/container/gmap"
var (
// Instance map
instances = gmap.NewStrAnyMap(true)
)
// Instance returns an instance of redis client with specified group.
// The <name> param is unnecessary, if <name> is not passed,
// it returns a redis instance with default configuration group.
func Instance(name ...string) *Redis {
group := DEFAULT_GROUP_NAME
if len(name) > 0 && name[0] != "" {
group = name[0]
}
v := instances.GetOrSetFuncLock(group, func() interface{} {
if config, ok := GetConfig(group); ok {
r := New(config)
r.group = group
return r
}
return nil
})
if v != nil {
return v.(*Redis)
}
return nil
}