game_sync/etcd/manager.go

124 lines
3.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package etcd
import (
"context"
"reflect"
"time"
"go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/common"
"mongo.games.com/game/proto"
)
var mgr = &Manager{Client: new(Client)}
type Manager struct {
*Client
}
// Register .
func (this *Manager) Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
createFunc := func() interface{} {
tp := reflect.TypeOf(msgType)
if tp.Kind() == reflect.Ptr {
tp = tp.Elem()
}
return reflect.New(tp).Interface()
}
initFunc := func() int64 {
logger.Logger.Info("ETCD 拉取数据:", key)
res, err := this.GetValueWithPrefix(key)
if err == nil {
for i := int64(0); i < res.Count; i++ {
data := createFunc()
v, ok := data.(proto.Message)
if !ok {
logger.Logger.Errorf("ETCD %v error: not proto message", key)
continue
}
err := proto.Unmarshal(res.Kvs[i].Value, v)
if err != nil {
logger.Logger.Errorf("ETCD %v unmarshal error:%v", key, err)
continue
}
logger.Logger.Tracef("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), v)
event := &clientv3.Event{
Type: 0,
}
f(string(res.Kvs[i].Key), true, event, v)
}
if res.Header != nil {
return res.Header.Revision
}
} else {
logger.Logger.Errorf("ETCD get WithPrefix(%v) panic:%v", key, err)
}
return -1
}
// 监控数据变动
watchFunc := func(ctx context.Context, revision int64) {
this.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error {
for _, ev := range res.Events {
switch ev.Type {
case clientv3.EventTypePut:
data := createFunc()
v, ok := data.(proto.Message)
if !ok {
logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key))
continue
}
err := proto.Unmarshal(ev.Kv.Value, v)
if err != nil {
logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err)
continue
}
logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v)
f(string(ev.Kv.Key), false, ev, v)
case clientv3.EventTypeDelete:
logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key))
f(string(ev.Kv.Key), false, ev, nil)
}
}
return nil
})
}
this.AddFunc(initFunc, watchFunc)
}
func (this *Manager) Start() {
logger.Logger.Infof("EtcdClient开始连接url:%v;etcduser:%v;etcdpwd:%v", common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"))
err := this.Open(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute)
if err != nil {
logger.Logger.Tracef("Manager.Open err:%v", err)
}
this.ReInitAndWatchAll()
}
func (this *Manager) Shutdown() {
this.Close()
}
func (this *Manager) Reset() {
this.Close()
this.Start()
}
// Register 注册etcd监听方法
// key:监听的key
// msgType:数据类型
// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据类型为msgType
func Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
mgr.Register(key, msgType, f)
}
func Reset() {
mgr.Reset()
}