no message

This commit is contained in:
sk 2024-04-19 09:57:48 +08:00
parent 4fb1ec47b9
commit aadfc98306
6 changed files with 68 additions and 63 deletions

View File

@ -1,10 +1,10 @@
package main package main
import ( import (
"context"
"strings" "strings"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/logger"
"mongo.games.com/game/dbproxy/mongo" "mongo.games.com/game/dbproxy/mongo"
@ -13,7 +13,7 @@ import (
) )
func init() { func init() {
etcd.Register(etcd.ETCDKEY_SYS_PLT_DBCFG_PREFIX, webapi.PlatformDbConfig{}, func(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { etcd.Register(etcd.ETCDKEY_SYS_PLT_DBCFG_PREFIX, webapi.PlatformDbConfig{}, func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }

View File

@ -31,21 +31,21 @@ type Client struct {
closed bool closed bool
} }
func (this *Client) IsClosed() bool { func (c *Client) IsClosed() bool {
return this.closed return c.closed
} }
func (this *Client) Ctx() context.Context { func (c *Client) Ctx() context.Context {
if this.cli != nil { if c.cli != nil {
return this.cli.Ctx() return c.cli.Ctx()
} }
return context.TODO() return context.TODO()
} }
func (this *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error { func (c *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error {
var err error var err error
this.cli, err = clientv3.New(clientv3.Config{ c.cli, err = clientv3.New(clientv3.Config{
Endpoints: etcdUrl, Endpoints: etcdUrl,
Username: userName, Username: userName,
Password: passWord, Password: passWord,
@ -59,22 +59,22 @@ func (this *Client) Open(etcdUrl []string, userName, passWord string, dialTimeou
return err return err
} }
this.closed = false c.closed = false
return err return err
} }
func (this *Client) Close() error { func (c *Client) Close() error {
logger.Logger.Warn("EtcdClient.close") logger.Logger.Warn("EtcdClient.close")
this.closed = true c.closed = true
if this.cli != nil { if c.cli != nil {
return this.cli.Close() return c.cli.Close()
} }
return nil return nil
} }
// PutValue 添加键值对 // PutValue 添加键值对
func (this *Client) PutValue(key, value string) (*clientv3.PutResponse, error) { func (c *Client) PutValue(key, value string) (*clientv3.PutResponse, error) {
resp, err := this.cli.Put(context.TODO(), key, value) resp, err := c.cli.Put(context.TODO(), key, value)
if err != nil { if err != nil {
logger.Logger.Warnf("EtcdClient.PutValue(%v,%v) error:%v", key, value, err) logger.Logger.Warnf("EtcdClient.PutValue(%v,%v) error:%v", key, value, err)
} }
@ -82,8 +82,8 @@ func (this *Client) PutValue(key, value string) (*clientv3.PutResponse, error) {
} }
// GetValue 查询 // GetValue 查询
func (this *Client) GetValue(key string) (*clientv3.GetResponse, error) { func (c *Client) GetValue(key string) (*clientv3.GetResponse, error) {
resp, err := this.cli.Get(context.TODO(), key) resp, err := c.cli.Get(context.TODO(), key)
if err != nil { if err != nil {
logger.Logger.Warnf("EtcdClient.GetValue(%v) error:%v", key, err) logger.Logger.Warnf("EtcdClient.GetValue(%v) error:%v", key, err)
} }
@ -91,8 +91,8 @@ func (this *Client) GetValue(key string) (*clientv3.GetResponse, error) {
} }
// DelValue 返回删除了几条数据 // DelValue 返回删除了几条数据
func (this *Client) DelValue(key string) (*clientv3.DeleteResponse, error) { func (c *Client) DelValue(key string) (*clientv3.DeleteResponse, error) {
res, err := this.cli.Delete(context.TODO(), key) res, err := c.cli.Delete(context.TODO(), key)
if err != nil { if err != nil {
logger.Logger.Warnf("EtcdClient.DelValue(%v) error:%v", key, err) logger.Logger.Warnf("EtcdClient.DelValue(%v) error:%v", key, err)
} }
@ -100,8 +100,8 @@ func (this *Client) DelValue(key string) (*clientv3.DeleteResponse, error) {
} }
// DelValueWithPrefix 按照前缀删除 // DelValueWithPrefix 按照前缀删除
func (this *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) { func (c *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) {
res, err := this.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix()) res, err := c.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix())
if err != nil { if err != nil {
logger.Logger.Warnf("EtcdClient.DelValueWithPrefix(%v) error:%v", prefix, err) logger.Logger.Warnf("EtcdClient.DelValueWithPrefix(%v) error:%v", prefix, err)
} }
@ -109,8 +109,8 @@ func (this *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse,
} }
// GetValueWithPrefix 获取前缀 // GetValueWithPrefix 获取前缀
func (this *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) { func (c *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) {
resp, err := this.cli.Get(context.TODO(), prefix, clientv3.WithPrefix()) resp, err := c.cli.Get(context.TODO(), prefix, clientv3.WithPrefix())
if err != nil { if err != nil {
logger.Logger.Warnf("EtcdClient.GetValueWIthPrefix(%v) error:%v", prefix, err) logger.Logger.Warnf("EtcdClient.GetValueWIthPrefix(%v) error:%v", prefix, err)
} }
@ -118,13 +118,13 @@ func (this *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, er
} }
// WatchWithPrefix 监测前缀创建事件 // WatchWithPrefix 监测前缀创建事件
func (this *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan { func (c *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan {
if this.cli != nil { if c.cli != nil {
opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithCreatedNotify()} opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithCreatedNotify()}
if revision > 0 { if revision > 0 {
opts = append(opts, clientv3.WithRev(revision)) opts = append(opts, clientv3.WithRev(revision))
} }
return this.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...) return c.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...)
} }
return nil return nil
} }
@ -160,41 +160,41 @@ func (this *Client) WatchWithPrefix(prefix string, revision int64) clientv3.Watc
//} //}
// AddFunc 添加监听函数 // AddFunc 添加监听函数
func (this *Client) AddFunc(initFunc InitFunc, watchFunc WatchFunc) { func (c *Client) AddFunc(initFunc InitFunc, watchFunc WatchFunc) {
funcPair := FuncPair{ funcPair := FuncPair{
initFunc: initFunc, initFunc: initFunc,
watchFunc: watchFunc, watchFunc: watchFunc,
} }
this.functions = append(this.functions, funcPair) c.functions = append(c.functions, funcPair)
} }
// ReInitAndWatchAll 重新监听 // ReInitAndWatchAll 重新监听
func (this *Client) ReInitAndWatchAll() { func (c *Client) ReInitAndWatchAll() {
if this.closed { if c.closed {
return return
} }
oldFunc := this.functions oldFunc := c.functions
this.functions = nil c.functions = nil
for i := 0; i < len(oldFunc); i++ { for i := 0; i < len(oldFunc); i++ {
this.InitAndWatch(oldFunc[i].initFunc, oldFunc[i].watchFunc) c.InitAndWatch(oldFunc[i].initFunc, oldFunc[i].watchFunc)
} }
} }
// InitAndWatch 开始监听 // InitAndWatch 开始监听
func (this *Client) InitAndWatch(initFunc InitFunc, watchFunc WatchFunc) { func (c *Client) InitAndWatch(initFunc InitFunc, watchFunc WatchFunc) {
funcPair := FuncPair{ funcPair := FuncPair{
initFunc: initFunc, initFunc: initFunc,
watchFunc: watchFunc, watchFunc: watchFunc,
} }
this.functions = append(this.functions, funcPair) c.functions = append(c.functions, funcPair)
lastRevision := initFunc() lastRevision := initFunc()
ctx, _ := context.WithCancel(this.cli.Ctx()) ctx, _ := context.WithCancel(c.cli.Ctx())
watchFunc(ctx, lastRevision+1) watchFunc(ctx, lastRevision+1)
} }
// GoWatch 异步监听 // GoWatch 异步监听
func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string, f func(res clientv3.WatchResponse) error) { func (c *Client) GoWatch(ctx context.Context, revision int64, prefix string, f func(res clientv3.WatchResponse) error) {
go func() { go func() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
@ -203,10 +203,10 @@ func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string,
logger.Logger.Warnf("etcd watch WithPrefix(%v) quit!!!", prefix) logger.Logger.Warnf("etcd watch WithPrefix(%v) quit!!!", prefix)
}() }()
var times int64 var times int64
for !this.closed { for !c.closed {
times++ times++
logger.Logger.Warnf("etcd watch WithPrefix(%v) base revision %v start[%v]!!!", prefix, revision, times) logger.Logger.Warnf("etcd watch WithPrefix(%v) base revision %v start[%v]!!!", prefix, revision, times)
rch := this.WatchWithPrefix(prefix, revision) rch := c.WatchWithPrefix(prefix, revision)
for { for {
skip := false skip := false
select { select {
@ -214,39 +214,39 @@ func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string,
if !ok { if !ok {
return return
} }
case wresp, ok := <-rch: case resp, ok := <-rch:
if !ok { if !ok {
logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed", prefix) logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed", prefix)
skip = true skip = true
break break
} }
if wresp.Header.Revision > revision { if resp.Header.Revision > revision {
revision = wresp.Header.Revision revision = resp.Header.Revision
} }
if wresp.Canceled { if resp.Canceled {
logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, wresp.Err()) logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, resp.Err())
skip = true skip = true
break break
} }
if err := wresp.Err(); err != nil { if err := resp.Err(); err != nil {
logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, wresp.Err()) logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, resp.Err())
continue continue
} }
if !model.GameParamData.UseEtcd { if !model.GameParamData.UseEtcd {
continue continue
} }
if len(wresp.Events) == 0 { if len(resp.Events) == 0 {
continue continue
} }
logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, wresp.Header) logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, resp.Header)
obj := core.CoreObject() obj := core.CoreObject()
if obj != nil { if obj != nil {
func(res clientv3.WatchResponse) { func(res clientv3.WatchResponse) {
obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error {
return f(res) return f(res)
}), true) }), true)
}(wresp) }(resp)
} }
} }

View File

@ -20,7 +20,7 @@ type Manager struct {
} }
// Register . // Register .
func (this *Manager) Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) { func (this *Manager) Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
createFunc := func() interface{} { createFunc := func() interface{} {
tp := reflect.TypeOf(msgType) tp := reflect.TypeOf(msgType)
@ -50,7 +50,7 @@ func (this *Manager) Register(key string, msgType interface{}, f func(completeKe
event := &clientv3.Event{ event := &clientv3.Event{
Type: 0, Type: 0,
} }
f(string(res.Kvs[i].Key), true, event, v) f(context.TODO(), string(res.Kvs[i].Key), true, event, v)
} }
if res.Header != nil { if res.Header != nil {
return res.Header.Revision return res.Header.Revision
@ -79,10 +79,10 @@ func (this *Manager) Register(key string, msgType interface{}, f func(completeKe
continue continue
} }
logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v)
f(string(ev.Kv.Key), false, ev, v) f(ctx, string(ev.Kv.Key), false, ev, v)
case clientv3.EventTypeDelete: case clientv3.EventTypeDelete:
logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key))
f(string(ev.Kv.Key), false, ev, nil) f(ctx, string(ev.Kv.Key), false, ev, nil)
} }
} }
return nil return nil
@ -113,8 +113,8 @@ func (this *Manager) Reset() {
// Register 注册etcd监听方法 // Register 注册etcd监听方法
// key:监听的key // key:监听的key
// msgType:数据类型 // msgType:数据类型
// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据类型为msgType // f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据类型为msgType,是指针类型
func Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) { func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) {
mgr.Register(key, msgType, f) mgr.Register(key, msgType, f)
} }

View File

@ -1,8 +1,9 @@
package base package base
import ( import (
"go.etcd.io/etcd/client/v3" "context"
"go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/logger"
"mongo.games.com/game/etcd" "mongo.games.com/game/etcd"
@ -21,7 +22,7 @@ func init() {
etcd.Register(etcd.ETCDKEY_ACT_Collect, webapi.WelfareCollectConfig{}, platformConfigEtcd) etcd.Register(etcd.ETCDKEY_ACT_Collect, webapi.WelfareCollectConfig{}, platformConfigEtcd)
} }
func platformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func platformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }

View File

@ -1,6 +1,8 @@
package com package com
import ( import (
"context"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"mongo.games.com/game/etcd" "mongo.games.com/game/etcd"
@ -12,7 +14,7 @@ func init() {
etcd.Register(etcd.ETCDKEY_PLATFORM_PREFIX, webapi.Platform{}, PlatformConfigEtcd) etcd.Register(etcd.ETCDKEY_PLATFORM_PREFIX, webapi.Platform{}, PlatformConfigEtcd)
} }
func PlatformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func PlatformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }

View File

@ -1,6 +1,8 @@
package main package main
import ( import (
"context"
"go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3"
"mongo.games.com/goserver/core/logger" "mongo.games.com/goserver/core/logger"
@ -21,7 +23,7 @@ func init() {
//func ExchangeShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) {} //func ExchangeShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) {}
func WelfareCollectConfig(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func WelfareCollectConfig(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }
@ -33,7 +35,7 @@ func WelfareCollectConfig(completeKey string, isInit bool, event *clientv3.Event
WelfareMgrSington.UpdateCollectConfig(config) WelfareMgrSington.UpdateCollectConfig(config)
} }
func ItemShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func ItemShopList(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }
@ -45,14 +47,14 @@ func ItemShopList(completeKey string, isInit bool, event *clientv3.Event, data i
ShopMgrSington.UpdateItemShop(config) ShopMgrSington.UpdateItemShop(config)
} }
func PlatformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func PlatformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }
PlatformMgrSingleton.UpdateConfig(data) PlatformMgrSingleton.UpdateConfig(data)
} }
func ExchangeShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { func ExchangeShopList(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) {
if event.Type == clientv3.EventTypeDelete { if event.Type == clientv3.EventTypeDelete {
return return
} }