124 lines
3.3 KiB
Go
124 lines
3.3 KiB
Go
package etcd
|
||
|
||
import (
|
||
"context"
|
||
"reflect"
|
||
"time"
|
||
|
||
"go.etcd.io/etcd/client/v3"
|
||
|
||
"mongo.games.com/goserver/core/logger"
|
||
|
||
"mongo.games.com/game/common"
|
||
"mongo.games.com/game/proto"
|
||
)
|
||
|
||
var mgr = &Manager{Client: new(Client)}
|
||
|
||
type Manager struct {
|
||
*Client
|
||
}
|
||
|
||
// Register .
|
||
func (this *Manager) Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
|
||
|
||
createFunc := func() interface{} {
|
||
tp := reflect.TypeOf(msgType)
|
||
if tp.Kind() == reflect.Ptr {
|
||
tp = tp.Elem()
|
||
}
|
||
return reflect.New(tp).Interface()
|
||
}
|
||
|
||
initFunc := func() int64 {
|
||
logger.Logger.Info("ETCD 拉取数据:", key)
|
||
res, err := this.GetValueWithPrefix(key)
|
||
if err == nil {
|
||
for i := int64(0); i < res.Count; i++ {
|
||
data := createFunc()
|
||
v, ok := data.(proto.Message)
|
||
if !ok {
|
||
logger.Logger.Errorf("ETCD %v error: not proto message", key)
|
||
continue
|
||
}
|
||
err := proto.Unmarshal(res.Kvs[i].Value, v)
|
||
if err != nil {
|
||
logger.Logger.Errorf("ETCD %v unmarshal error:%v", key, err)
|
||
continue
|
||
}
|
||
logger.Logger.Tracef("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), v)
|
||
event := &clientv3.Event{
|
||
Type: 0,
|
||
}
|
||
f(string(res.Kvs[i].Key), true, event, v)
|
||
}
|
||
if res.Header != nil {
|
||
return res.Header.Revision
|
||
}
|
||
} else {
|
||
logger.Logger.Errorf("ETCD get WithPrefix(%v) panic:%v", key, err)
|
||
}
|
||
return -1
|
||
}
|
||
|
||
// 监控数据变动
|
||
watchFunc := func(ctx context.Context, revision int64) {
|
||
this.GoWatch(ctx, revision, key, func(res clientv3.WatchResponse) error {
|
||
for _, ev := range res.Events {
|
||
switch ev.Type {
|
||
case clientv3.EventTypePut:
|
||
data := createFunc()
|
||
v, ok := data.(proto.Message)
|
||
if !ok {
|
||
logger.Logger.Errorf("ETCD %v error: not proto message", string(ev.Kv.Key))
|
||
continue
|
||
}
|
||
err := proto.Unmarshal(ev.Kv.Value, v)
|
||
if err != nil {
|
||
logger.Logger.Errorf("etcd unmarshal(%v) error:%v", string(ev.Kv.Key), err)
|
||
continue
|
||
}
|
||
logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v)
|
||
f(string(ev.Kv.Key), false, ev, v)
|
||
case clientv3.EventTypeDelete:
|
||
logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key))
|
||
f(string(ev.Kv.Key), false, ev, nil)
|
||
}
|
||
}
|
||
return nil
|
||
})
|
||
}
|
||
|
||
this.AddFunc(initFunc, watchFunc)
|
||
}
|
||
|
||
func (this *Manager) Start() {
|
||
logger.Logger.Infof("EtcdClient开始连接url:%v;etcduser:%v;etcdpwd:%v", common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"))
|
||
err := this.Open(common.CustomConfig.GetStrings("etcdurl"), common.CustomConfig.GetString("etcduser"), common.CustomConfig.GetString("etcdpwd"), time.Minute)
|
||
if err != nil {
|
||
logger.Logger.Tracef("Manager.Open err:%v", err)
|
||
}
|
||
this.ReInitAndWatchAll()
|
||
}
|
||
|
||
func (this *Manager) Shutdown() {
|
||
this.Close()
|
||
}
|
||
|
||
func (this *Manager) Reset() {
|
||
this.Close()
|
||
this.Start()
|
||
}
|
||
|
||
// Register 注册etcd监听方法
|
||
// key:监听的key
|
||
// msgType:数据类型
|
||
// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType
|
||
func Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
|
||
mgr.Register(key, msgType, f)
|
||
}
|
||
|
||
func Reset() {
|
||
mgr.Reset()
|
||
}
|