package etcd import ( "context" "encoding/json" "reflect" "go.etcd.io/etcd/client/v3" "mongo.games.com/goserver/core" "mongo.games.com/goserver/core/basic" "mongo.games.com/goserver/core/etcd" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/proto" ) // Register 注册etcd监听方法 // key:监听的key // msgType:数据类型 // f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 // **isInit** 为true时,表示是第一次主动拉取数据,此时允许耗时操作。为false时,表示是监听到数据变更,此时不允许耗时操作。 func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { createFunc := func() interface{} { tp := reflect.TypeOf(msgType) if tp.Kind() == reflect.Ptr { tp = tp.Elem() } return reflect.New(tp).Interface() } initFunc := func(ctx context.Context, res *clientv3.GetResponse) { if res == nil { return } f := func() { for i := int64(0); i < res.Count; i++ { data := createFunc() switch d := data.(type) { case proto.Message: err := proto.Unmarshal(res.Kvs[i].Value, d) if err != nil { logger.Logger.Errorf("ETCD %v proto.Unmarshal error:%v", key, err) continue } logger.Logger.Debugf("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), d) event := &clientv3.Event{ Type: 0, } f(context.TODO(), string(res.Kvs[i].Key), true, event, d) default: err := json.Unmarshal(res.Kvs[i].Value, d) if err != nil { logger.Logger.Errorf("ETCD %v josn.Unmarshal error:%v", key, err) continue } logger.Logger.Debugf("ETCD 拉取成功 %v ==> %v", string(res.Kvs[i].Key), d) event := &clientv3.Event{ Type: 0, } f(context.TODO(), string(res.Kvs[i].Key), true, event, d) } } } obj := core.CoreObject() if obj == nil { f() return } obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { f() return nil }), true) } // 监控数据变动 watchFunc := func(ctx context.Context, res *clientv3.WatchResponse) { if res == nil { return } f := func() { for _, ev := range res.Events { switch ev.Type { case clientv3.EventTypePut: data := createFunc() switch d := data.(type) { case proto.Message: err := proto.Unmarshal(ev.Kv.Value, d) if err != nil { logger.Logger.Errorf("ETCD %v proto.Unmarshal error:%v", key, err) continue } logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), d) f(ctx, string(ev.Kv.Key), false, ev, d) default: err := json.Unmarshal(ev.Kv.Value, d) if err != nil { logger.Logger.Errorf("ETCD %v josn.Unmarshal error:%v", key, err) continue } logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), d) f(ctx, string(ev.Kv.Key), false, ev, d) } case clientv3.EventTypeDelete: logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) f(ctx, string(ev.Kv.Key), false, ev, nil) } } } obj := core.CoreObject() if obj == nil { f() return } obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { f() return nil }), true) } etcd.AddFunc(key, initFunc, watchFunc) }