game_sync/etcd/register.go

122 lines
3.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package etcd
import (
"context"
"encoding/json"
"reflect"
"go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core"
"mongo.games.com/goserver/core/basic"
"mongo.games.com/goserver/core/etcd"
"mongo.games.com/goserver/core/logger"
"mongo.games.com/game/proto"
)
// Register 注册etcd监听方法
// key:监听的key
// msgType:数据类型
// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据类型为msgType,是指针类型
// **isInit** 为true时表示是第一次主动拉取数据此时允许耗时操作。为false时表示是监听到数据变更此时不允许耗时操作。
func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
createFunc := func() interface{} {
tp := reflect.TypeOf(msgType)
if tp.Kind() == reflect.Ptr {
tp = tp.Elem()
}
return reflect.New(tp).Interface()
}
initFunc := func(ctx context.Context, res *clientv3.GetResponse) {
if res == nil {
return
}
f := func() {
for i := int64(0); i < res.Count; i++ {
data := createFunc()
switch d := data.(type) {
case proto.Message:
err := proto.Unmarshal(res.Kvs[i].Value, d)
if err != nil {
logger.Logger.Errorf("ETCD %v proto.Unmarshal error:%v", key, err)
continue
}
logger.Logger.Tracef("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), d)
event := &clientv3.Event{
Type: 0,
}
f(context.TODO(), string(res.Kvs[i].Key), true, event, d)
default:
err := json.Unmarshal(res.Kvs[i].Value, d)
if err != nil {
logger.Logger.Errorf("ETCD %v josn.Unmarshal error:%v", key, err)
continue
}
logger.Logger.Tracef("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), d)
event := &clientv3.Event{
Type: 0,
}
f(context.TODO(), string(res.Kvs[i].Key), true, event, d)
}
}
}
obj := core.CoreObject()
if obj == nil {
f()
return
}
obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error {
f()
return nil
}), true)
}
// 监控数据变动
watchFunc := func(ctx context.Context, res *clientv3.WatchResponse) {
if res == nil {
return
}
f := func() {
for _, ev := range res.Events {
switch ev.Type {
case clientv3.EventTypePut:
data := createFunc()
switch d := data.(type) {
case proto.Message:
err := proto.Unmarshal(ev.Kv.Value, d)
if err != nil {
logger.Logger.Errorf("ETCD %v proto.Unmarshal error:%v", key, err)
continue
}
logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), d)
f(ctx, string(ev.Kv.Key), false, ev, d)
default:
err := json.Unmarshal(ev.Kv.Value, d)
if err != nil {
logger.Logger.Errorf("ETCD %v josn.Unmarshal error:%v", key, err)
continue
}
logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), d)
f(ctx, string(ev.Kv.Key), false, ev, d)
}
case clientv3.EventTypeDelete:
logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key))
f(ctx, string(ev.Kv.Key), false, ev, nil)
}
}
}
obj := core.CoreObject()
if obj == nil {
f()
return
}
obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error {
f()
return nil
}), true)
}
etcd.AddFunc(key, initFunc, watchFunc)
}