game_sync/etcd/client.go

261 lines
6.9 KiB
Go

package etcd
import (
"context"
"time"
"go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/model"
)
/*
etcd常用操作和数据监听
*/
type (
InitFunc func() int64
WatchFunc func(context.Context, int64)
FuncPair struct {
initFunc InitFunc
watchFunc WatchFunc
}
)
type Client struct {
cli *clientv3.Client
functions []FuncPair
closed bool
}
func (this *Client) IsClosed() bool {
return this.closed
}
func (this *Client) Ctx() context.Context {
if this.cli != nil {
return this.cli.Ctx()
}
return context.TODO()
}
func (this *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error {
var err error
this.cli, err = clientv3.New(clientv3.Config{
Endpoints: etcdUrl,
Username: userName,
Password: passWord,
DialTimeout: dialTimeout,
DialKeepAliveTime: 5 * time.Second,
DialKeepAliveTimeout: 30 * time.Second,
})
if err != nil {
logger.Logger.Errorf("EtcdClient.open(%v) err:%v", etcdUrl, err)
return err
}
this.closed = false
return err
}
func (this *Client) Close() error {
logger.Logger.Warn("EtcdClient.close")
this.closed = true
if this.cli != nil {
return this.cli.Close()
}
return nil
}
// PutValue 添加键值对
func (this *Client) PutValue(key, value string) (*clientv3.PutResponse, error) {
resp, err := this.cli.Put(context.TODO(), key, value)
if err != nil {
logger.Logger.Warnf("EtcdClient.PutValue(%v,%v) error:%v", key, value, err)
}
return resp, err
}
// GetValue 查询
func (this *Client) GetValue(key string) (*clientv3.GetResponse, error) {
resp, err := this.cli.Get(context.TODO(), key)
if err != nil {
logger.Logger.Warnf("EtcdClient.GetValue(%v) error:%v", key, err)
}
return resp, err
}
// DelValue 返回删除了几条数据
func (this *Client) DelValue(key string) (*clientv3.DeleteResponse, error) {
res, err := this.cli.Delete(context.TODO(), key)
if err != nil {
logger.Logger.Warnf("EtcdClient.DelValue(%v) error:%v", key, err)
}
return res, err
}
// DelValueWithPrefix 按照前缀删除
func (this *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) {
res, err := this.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix())
if err != nil {
logger.Logger.Warnf("EtcdClient.DelValueWithPrefix(%v) error:%v", prefix, err)
}
return res, err
}
// GetValueWithPrefix 获取前缀
func (this *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) {
resp, err := this.cli.Get(context.TODO(), prefix, clientv3.WithPrefix())
if err != nil {
logger.Logger.Warnf("EtcdClient.GetValueWIthPrefix(%v) error:%v", prefix, err)
}
return resp, err
}
// WatchWithPrefix 监测前缀创建事件
func (this *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan {
if this.cli != nil {
opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithCreatedNotify()}
if revision > 0 {
opts = append(opts, clientv3.WithRev(revision))
}
return this.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...)
}
return nil
}
// Compact 压缩数据
//func (this *Client) Compact() {
// if this.closed {
// return
// }
//
// resp, err := this.GetValue("@@@GET_LASTEST_REVISION@@@")
// if err == nil {
// ctx, _ := context.WithCancel(this.cli.Ctx())
// start := time.Now()
// compactResponse, err := this.cli.Compact(ctx, resp.Header.Revision, clientv3.WithCompactPhysical())
// if err == nil {
// logger.Logger.Infof("EtcdClient.Compact From %v CompactResponse %v take %v", resp.Header.Revision, compactResponse.Header, time.Now().Sub(start))
// } else {
// logger.Logger.Errorf("EtcdClient.Compact From %v CompactResponse:%v take:%v err:%v", resp.Header.Revision, compactResponse, time.Now().Sub(start), err)
// }
// endpoints := this.cli.Endpoints()
// for _, endpoint := range endpoints {
// ctx1, _ := context.WithCancel(this.cli.Ctx())
// start := time.Now()
// defragmentResponse, err := this.cli.Defragment(ctx1, endpoint)
// if err == nil {
// logger.Logger.Infof("EtcdClient.Defragment %v,%v take %v", endpoint, defragmentResponse.Header, time.Now().Sub(start))
// } else {
// logger.Logger.Errorf("EtcdClient.Defragment DefragmentResponse:%v take:%v err:%v", defragmentResponse, time.Now().Sub(start), err)
// }
// }
// }
//}
// AddFunc 添加监听函数
func (this *Client) AddFunc(initFunc InitFunc, watchFunc WatchFunc) {
funcPair := FuncPair{
initFunc: initFunc,
watchFunc: watchFunc,
}
this.functions = append(this.functions, funcPair)
}
// ReInitAndWatchAll 重新监听
func (this *Client) ReInitAndWatchAll() {
if this.closed {
return
}
oldFunc := this.functions
this.functions = nil
for i := 0; i < len(oldFunc); i++ {
this.InitAndWatch(oldFunc[i].initFunc, oldFunc[i].watchFunc)
}
}
// InitAndWatch 开始监听
func (this *Client) InitAndWatch(initFunc InitFunc, watchFunc WatchFunc) {
funcPair := FuncPair{
initFunc: initFunc,
watchFunc: watchFunc,
}
this.functions = append(this.functions, funcPair)
lastRevision := initFunc()
ctx, _ := context.WithCancel(this.cli.Ctx())
watchFunc(ctx, lastRevision+1)
}
// GoWatch 异步监听
func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string, f func(res clientv3.WatchResponse) error) {
go func() {
defer func() {
if err := recover(); err != nil {
logger.Logger.Errorf("etcd watch WithPrefix(%v) panic:%v", prefix, err)
}
logger.Logger.Warnf("etcd watch WithPrefix(%v) quit!!!", prefix)
}()
var times int64
for !this.closed {
times++
logger.Logger.Warnf("etcd watch WithPrefix(%v) base revision %v start[%v]!!!", prefix, revision, times)
rch := this.WatchWithPrefix(prefix, revision)
for {
skip := false
select {
case _, ok := <-ctx.Done():
if !ok {
return
}
case wresp, ok := <-rch:
if !ok {
logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed", prefix)
skip = true
break
}
if wresp.Header.Revision > revision {
revision = wresp.Header.Revision
}
if wresp.Canceled {
logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, wresp.Err())
skip = true
break
}
if err := wresp.Err(); err != nil {
logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, wresp.Err())
continue
}
if !model.GameParamData.UseEtcd {
continue
}
if len(wresp.Events) == 0 {
continue
}
logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, wresp.Header)
obj := core.CoreObject()
if obj != nil {
func(res clientv3.WatchResponse) {
obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error {
return f(res)
}), true)
}(wresp)
}
}
if skip {
break
}
}
time.Sleep(time.Second)
}
}()
}