diff --git a/dbproxy/etcd.go b/dbproxy/etcd.go index 429e76f..b62d087 100644 --- a/dbproxy/etcd.go +++ b/dbproxy/etcd.go @@ -1,10 +1,10 @@ package main import ( + "context" "strings" "go.etcd.io/etcd/client/v3" - "mongo.games.com/goserver/core/logger" "mongo.games.com/game/dbproxy/mongo" @@ -13,7 +13,7 @@ import ( ) func init() { - etcd.Register(etcd.ETCDKEY_SYS_PLT_DBCFG_PREFIX, webapi.PlatformDbConfig{}, func(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { + etcd.Register(etcd.ETCDKEY_SYS_PLT_DBCFG_PREFIX, webapi.PlatformDbConfig{}, func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } diff --git a/etcd/client.go b/etcd/client.go index 5b090f1..c4ba837 100644 --- a/etcd/client.go +++ b/etcd/client.go @@ -31,21 +31,21 @@ type Client struct { closed bool } -func (this *Client) IsClosed() bool { - return this.closed +func (c *Client) IsClosed() bool { + return c.closed } -func (this *Client) Ctx() context.Context { - if this.cli != nil { - return this.cli.Ctx() +func (c *Client) Ctx() context.Context { + if c.cli != nil { + return c.cli.Ctx() } return context.TODO() } -func (this *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error { +func (c *Client) Open(etcdUrl []string, userName, passWord string, dialTimeout time.Duration) error { var err error - this.cli, err = clientv3.New(clientv3.Config{ + c.cli, err = clientv3.New(clientv3.Config{ Endpoints: etcdUrl, Username: userName, Password: passWord, @@ -59,22 +59,22 @@ func (this *Client) Open(etcdUrl []string, userName, passWord string, dialTimeou return err } - this.closed = false + c.closed = false return err } -func (this *Client) Close() error { +func (c *Client) Close() error { logger.Logger.Warn("EtcdClient.close") - this.closed = true - if this.cli != nil { - return this.cli.Close() + c.closed = true + if c.cli != nil { + return c.cli.Close() } return nil } // PutValue 添加键值对 -func (this *Client) PutValue(key, value string) (*clientv3.PutResponse, error) { - resp, err := this.cli.Put(context.TODO(), key, value) +func (c *Client) PutValue(key, value string) (*clientv3.PutResponse, error) { + resp, err := c.cli.Put(context.TODO(), key, value) if err != nil { logger.Logger.Warnf("EtcdClient.PutValue(%v,%v) error:%v", key, value, err) } @@ -82,8 +82,8 @@ func (this *Client) PutValue(key, value string) (*clientv3.PutResponse, error) { } // GetValue 查询 -func (this *Client) GetValue(key string) (*clientv3.GetResponse, error) { - resp, err := this.cli.Get(context.TODO(), key) +func (c *Client) GetValue(key string) (*clientv3.GetResponse, error) { + resp, err := c.cli.Get(context.TODO(), key) if err != nil { logger.Logger.Warnf("EtcdClient.GetValue(%v) error:%v", key, err) } @@ -91,8 +91,8 @@ func (this *Client) GetValue(key string) (*clientv3.GetResponse, error) { } // DelValue 返回删除了几条数据 -func (this *Client) DelValue(key string) (*clientv3.DeleteResponse, error) { - res, err := this.cli.Delete(context.TODO(), key) +func (c *Client) DelValue(key string) (*clientv3.DeleteResponse, error) { + res, err := c.cli.Delete(context.TODO(), key) if err != nil { logger.Logger.Warnf("EtcdClient.DelValue(%v) error:%v", key, err) } @@ -100,8 +100,8 @@ func (this *Client) DelValue(key string) (*clientv3.DeleteResponse, error) { } // DelValueWithPrefix 按照前缀删除 -func (this *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) { - res, err := this.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix()) +func (c *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, error) { + res, err := c.cli.Delete(context.TODO(), prefix, clientv3.WithPrefix()) if err != nil { logger.Logger.Warnf("EtcdClient.DelValueWithPrefix(%v) error:%v", prefix, err) } @@ -109,8 +109,8 @@ func (this *Client) DelValueWithPrefix(prefix string) (*clientv3.DeleteResponse, } // GetValueWithPrefix 获取前缀 -func (this *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) { - resp, err := this.cli.Get(context.TODO(), prefix, clientv3.WithPrefix()) +func (c *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, error) { + resp, err := c.cli.Get(context.TODO(), prefix, clientv3.WithPrefix()) if err != nil { logger.Logger.Warnf("EtcdClient.GetValueWIthPrefix(%v) error:%v", prefix, err) } @@ -118,13 +118,13 @@ func (this *Client) GetValueWithPrefix(prefix string) (*clientv3.GetResponse, er } // WatchWithPrefix 监测前缀创建事件 -func (this *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan { - if this.cli != nil { +func (c *Client) WatchWithPrefix(prefix string, revision int64) clientv3.WatchChan { + if c.cli != nil { opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithCreatedNotify()} if revision > 0 { opts = append(opts, clientv3.WithRev(revision)) } - return this.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...) + return c.cli.Watch(clientv3.WithRequireLeader(context.Background()), prefix, opts...) } return nil } @@ -160,41 +160,41 @@ func (this *Client) WatchWithPrefix(prefix string, revision int64) clientv3.Watc //} // AddFunc 添加监听函数 -func (this *Client) AddFunc(initFunc InitFunc, watchFunc WatchFunc) { +func (c *Client) AddFunc(initFunc InitFunc, watchFunc WatchFunc) { funcPair := FuncPair{ initFunc: initFunc, watchFunc: watchFunc, } - this.functions = append(this.functions, funcPair) + c.functions = append(c.functions, funcPair) } // ReInitAndWatchAll 重新监听 -func (this *Client) ReInitAndWatchAll() { - if this.closed { +func (c *Client) ReInitAndWatchAll() { + if c.closed { return } - oldFunc := this.functions - this.functions = nil + oldFunc := c.functions + c.functions = nil for i := 0; i < len(oldFunc); i++ { - this.InitAndWatch(oldFunc[i].initFunc, oldFunc[i].watchFunc) + c.InitAndWatch(oldFunc[i].initFunc, oldFunc[i].watchFunc) } } // InitAndWatch 开始监听 -func (this *Client) InitAndWatch(initFunc InitFunc, watchFunc WatchFunc) { +func (c *Client) InitAndWatch(initFunc InitFunc, watchFunc WatchFunc) { funcPair := FuncPair{ initFunc: initFunc, watchFunc: watchFunc, } - this.functions = append(this.functions, funcPair) + c.functions = append(c.functions, funcPair) lastRevision := initFunc() - ctx, _ := context.WithCancel(this.cli.Ctx()) + ctx, _ := context.WithCancel(c.cli.Ctx()) watchFunc(ctx, lastRevision+1) } // GoWatch 异步监听 -func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string, f func(res clientv3.WatchResponse) error) { +func (c *Client) GoWatch(ctx context.Context, revision int64, prefix string, f func(res clientv3.WatchResponse) error) { go func() { defer func() { if err := recover(); err != nil { @@ -203,10 +203,10 @@ func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string, logger.Logger.Warnf("etcd watch WithPrefix(%v) quit!!!", prefix) }() var times int64 - for !this.closed { + for !c.closed { times++ logger.Logger.Warnf("etcd watch WithPrefix(%v) base revision %v start[%v]!!!", prefix, revision, times) - rch := this.WatchWithPrefix(prefix, revision) + rch := c.WatchWithPrefix(prefix, revision) for { skip := false select { @@ -214,39 +214,39 @@ func (this *Client) GoWatch(ctx context.Context, revision int64, prefix string, if !ok { return } - case wresp, ok := <-rch: + case resp, ok := <-rch: if !ok { logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed", prefix) skip = true break } - if wresp.Header.Revision > revision { - revision = wresp.Header.Revision + if resp.Header.Revision > revision { + revision = resp.Header.Revision } - if wresp.Canceled { - logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, wresp.Err()) + if resp.Canceled { + logger.Logger.Warnf("etcd watch WithPrefix(%v) be closed, reason:%v", prefix, resp.Err()) skip = true break } - if err := wresp.Err(); err != nil { - logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, wresp.Err()) + if err := resp.Err(); err != nil { + logger.Logger.Warnf("etcd watch WithPrefix(%v) err:%v", prefix, resp.Err()) continue } if !model.GameParamData.UseEtcd { continue } - if len(wresp.Events) == 0 { + if len(resp.Events) == 0 { continue } - logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, wresp.Header) + logger.Logger.Tracef("@goWatch %v changed, header:%#v", prefix, resp.Header) obj := core.CoreObject() if obj != nil { func(res clientv3.WatchResponse) { obj.SendCommand(basic.CommandWrapper(func(*basic.Object) error { return f(res) }), true) - }(wresp) + }(resp) } } diff --git a/etcd/manager.go b/etcd/manager.go index 0a5765d..3dc6d98 100644 --- a/etcd/manager.go +++ b/etcd/manager.go @@ -20,7 +20,7 @@ type Manager struct { } // Register . -func (this *Manager) Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) { +func (this *Manager) Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { createFunc := func() interface{} { tp := reflect.TypeOf(msgType) @@ -50,7 +50,7 @@ func (this *Manager) Register(key string, msgType interface{}, f func(completeKe event := &clientv3.Event{ Type: 0, } - f(string(res.Kvs[i].Key), true, event, v) + f(context.TODO(), string(res.Kvs[i].Key), true, event, v) } if res.Header != nil { return res.Header.Revision @@ -79,10 +79,10 @@ func (this *Manager) Register(key string, msgType interface{}, f func(completeKe continue } logger.Logger.Tracef("ETCD 更新事件 %v ==> %v", string(ev.Kv.Key), v) - f(string(ev.Kv.Key), false, ev, v) + f(ctx, string(ev.Kv.Key), false, ev, v) case clientv3.EventTypeDelete: logger.Logger.Tracef("ETCD 删除事件 %v", string(ev.Kv.Key)) - f(string(ev.Kv.Key), false, ev, nil) + f(ctx, string(ev.Kv.Key), false, ev, nil) } } return nil @@ -113,8 +113,8 @@ func (this *Manager) Reset() { // Register 注册etcd监听方法 // key:监听的key // msgType:数据类型 -// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType -func Register(key string, msgType interface{}, f func(completeKey string, isInit bool, event *clientv3.Event, data interface{})) { +// f:数据变更回调方法, completeKey:完整键, isInit:第一次主动拉取数据,event:事件类型, data:已经反序列化的数据,类型为msgType,是指针类型 +func Register(key string, msgType interface{}, f func(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{})) { mgr.Register(key, msgType, f) } diff --git a/gamesrv/base/etcd.go b/gamesrv/base/etcd.go index 45b2c46..be7944e 100644 --- a/gamesrv/base/etcd.go +++ b/gamesrv/base/etcd.go @@ -1,8 +1,9 @@ package base import ( - "go.etcd.io/etcd/client/v3" + "context" + "go.etcd.io/etcd/client/v3" "mongo.games.com/goserver/core/logger" "mongo.games.com/game/etcd" @@ -21,7 +22,7 @@ func init() { etcd.Register(etcd.ETCDKEY_ACT_Collect, webapi.WelfareCollectConfig{}, platformConfigEtcd) } -func platformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func platformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } diff --git a/ranksrv/com/etcd.go b/ranksrv/com/etcd.go index 1da2b97..7fe6a23 100644 --- a/ranksrv/com/etcd.go +++ b/ranksrv/com/etcd.go @@ -1,6 +1,8 @@ package com import ( + "context" + "go.etcd.io/etcd/client/v3" "mongo.games.com/game/etcd" @@ -12,7 +14,7 @@ func init() { etcd.Register(etcd.ETCDKEY_PLATFORM_PREFIX, webapi.Platform{}, PlatformConfigEtcd) } -func PlatformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func PlatformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } diff --git a/worldsrv/etcd.go b/worldsrv/etcd.go index 09a7c37..1a2edb9 100644 --- a/worldsrv/etcd.go +++ b/worldsrv/etcd.go @@ -1,6 +1,8 @@ package main import ( + "context" + "go.etcd.io/etcd/client/v3" "mongo.games.com/goserver/core/logger" @@ -21,7 +23,7 @@ func init() { //func ExchangeShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) {} -func WelfareCollectConfig(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func WelfareCollectConfig(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } @@ -33,7 +35,7 @@ func WelfareCollectConfig(completeKey string, isInit bool, event *clientv3.Event WelfareMgrSington.UpdateCollectConfig(config) } -func ItemShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func ItemShopList(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } @@ -45,14 +47,14 @@ func ItemShopList(completeKey string, isInit bool, event *clientv3.Event, data i ShopMgrSington.UpdateItemShop(config) } -func PlatformConfigEtcd(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func PlatformConfigEtcd(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return } PlatformMgrSingleton.UpdateConfig(data) } -func ExchangeShopList(completeKey string, isInit bool, event *clientv3.Event, data interface{}) { +func ExchangeShopList(ctx context.Context, completeKey string, isInit bool, event *clientv3.Event, data interface{}) { if event.Type == clientv3.EventTypeDelete { return }